/* * Plain table refresh function. */ CREATE FUNCTION refresh_table(p_destination text, p_truncate_cascade boolean DEFAULT NULL, p_jobmon boolean DEFAULT NULL, p_lock_wait int DEFAULT NULL, p_debug boolean DEFAULT false) RETURNS void LANGUAGE plpgsql AS $$ DECLARE v_adv_lock boolean; v_cols text; v_cols_n_types text; v_condition text; v_cursor_name text; v_dblink int; v_dblink_name text; v_dblink_schema text; v_dest_schema_name text; v_dest_table text; v_dest_table_name text; v_fetch_sql text; v_filter text[]; v_job_id bigint; v_job_name text; v_jobmon boolean; v_jobmon_schema text; v_link_exists boolean; v_old_search_path text; v_post_script text[]; v_remote_sql text; v_rowcount bigint := 0; v_seq text; v_seq_max bigint; v_sequences text[]; v_source_table text; v_seq_name text; v_seq_oid oid; v_seq_reset text; v_seq_schema text; v_src_schema_name text; v_src_table_name text; v_step_id bigint; v_total bigint := 0; v_truncate_cascade boolean; v_truncate_sql text; BEGIN -- Take advisory lock to prevent multiple calls to function overlapping v_adv_lock := @extschema@.concurrent_lock_check(p_destination, p_lock_wait); IF v_adv_lock = 'false' THEN -- This code is known duplication of code below. -- This is done in order to keep advisory lock as early in the code as possible to avoid race conditions and still log if issues are encountered. v_job_name := 'Refresh Table: '||p_destination; SELECT jobmon INTO v_jobmon FROM @extschema@.refresh_config_table WHERE dest_table = p_destination; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; v_jobmon := COALESCE(p_jobmon, v_jobmon); IF v_jobmon IS TRUE AND v_jobmon_schema IS NULL THEN RAISE EXCEPTION 'jobmon config set to TRUE, but unable to determine if pg_jobmon extension is installed'; END IF; IF v_jobmon THEN EXECUTE format('SELECT %I.add_job(%L)', v_jobmon_schema, v_job_name) INTO v_job_id; EXECUTE format('SELECT %I.add_step(%L, %L)', v_jobmon_schema, v_job_id, 'Obtaining advisory lock for job: '||v_job_name) INTO v_step_id; EXECUTE format('SELECT %I.update_step(%L, %L, %L)', v_jobmon_schema, v_step_id, 'WARNING', 'Found concurrent job. Exiting gracefully'); EXECUTE format('SELECT %I.fail_job(%L, %L)', v_jobmon_schema, v_job_id, 2); END IF; PERFORM @extschema@.gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name); RAISE DEBUG 'Found concurrent job. Exiting gracefully'; RETURN; END IF; SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; IF p_jobmon IS TRUE AND v_jobmon_schema IS NULL THEN RAISE EXCEPTION 'p_jobmon parameter set to TRUE, but unable to determine if pg_jobmon extension is installed'; END IF; v_dblink_name := @extschema@.check_name_length('mimeo_table_refresh_'||p_destination); v_job_name := 'Refresh Table: '||p_destination; SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||COALESCE(v_jobmon_schema||',', '')||v_dblink_schema||',public'',''false'')'; SELECT source_table , dest_table , dblink , filter , condition , sequences , truncate_cascade , jobmon INTO v_source_table , v_dest_table , v_dblink , v_filter , v_condition , v_sequences , v_truncate_cascade , v_jobmon FROM refresh_config_table WHERE dest_table = p_destination; IF NOT FOUND THEN RAISE EXCEPTION 'Destination table given in argument (%) is not managed by mimeo.', p_destination; END IF; -- Allow override with parameter v_jobmon := COALESCE(p_jobmon, v_jobmon); IF v_jobmon THEN v_job_id := add_job(v_job_name); PERFORM gdb(p_debug,'Job ID: '||v_job_id::text); END IF; SELECT schemaname, tablename INTO v_dest_schema_name, v_dest_table_name FROM pg_catalog.pg_tables WHERE schemaname||'.'||tablename = v_dest_table; IF p_truncate_cascade IS NOT NULL THEN v_truncate_cascade := p_truncate_cascade; END IF; IF v_jobmon THEN v_step_id := add_step(v_job_id, 'Truncating destination table'); END IF; v_truncate_sql := format('TRUNCATE TABLE %I.%I', v_dest_schema_name, v_dest_table_name); IF v_truncate_cascade THEN v_truncate_sql := v_truncate_sql || ' CASCADE'; RAISE NOTICE 'WARNING! If this table had foreign keys, you have just truncated all referencing tables as well!'; IF v_jobmon THEN PERFORM update_step(v_step_id, 'OK', 'If this table had foreign keys, you have just truncated all referencing tables as well!'); END IF; ELSE IF v_jobmon THEN PERFORM update_step(v_step_id, 'OK', 'Done'); END IF; END IF; EXECUTE v_truncate_sql; PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink)); IF v_jobmon THEN v_step_id := add_step(v_job_id,'Grabbing Mapping, Building SQL'); END IF; SELECT array_to_string(p_cols, ',') , array_to_string(p_cols_n_types, ',') , p_source_schema_name , p_source_table_name INTO v_cols , v_cols_n_types , v_src_schema_name , v_src_table_name FROM manage_dest_table(v_dest_table, NULL, v_dblink_name, p_debug); IF v_src_table_name IS NULL THEN RAISE EXCEPTION 'Source table missing (%)', v_source_table; END IF; IF v_jobmon THEN PERFORM update_step(v_step_id, 'OK', 'Done'); v_step_id := add_step(v_job_id,'Inserting records into local table'); END IF; v_remote_sql := format('SELECT '||v_cols||' FROM %I.%I', v_src_schema_name, v_src_table_name); IF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition; END IF; -- Ensure name is consistent in case it would get truncated by maximium object name length v_cursor_name := @extschema@.check_name_length('mimeo_cursor_' || v_src_table_name, p_convert_standard := true); PERFORM dblink_open(v_dblink_name, v_cursor_name, v_remote_sql); v_rowcount := 0; LOOP v_fetch_sql := format('INSERT INTO %I.%I(%s) SELECT %s FROM dblink_fetch(%L, %L, %s) AS (%s)' , v_dest_schema_name , v_dest_table_name , v_cols , v_cols , v_dblink_name , v_cursor_name , '50000' , v_cols_n_types); EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; EXIT WHEN v_rowcount = 0; v_total := v_total + coalesce(v_rowcount, 0); PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); IF v_jobmon THEN PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.'); END IF; END LOOP; PERFORM dblink_close(v_dblink_name, v_cursor_name); IF v_jobmon THEN PERFORM update_step(v_step_id, 'OK','Inserted '||v_total||' rows'); END IF; PERFORM dblink_disconnect(v_dblink_name); -- Reset any sequences given in the parameter to their new value. -- Checks all tables that use the given sequence to ensure it's the max for the entire database. IF v_sequences IS NOT NULL THEN IF v_jobmon THEN v_step_id := add_step(v_job_id, 'Resetting sequences'); END IF; FOREACH v_seq IN ARRAY v_sequences LOOP SELECT n.nspname, c.relname, c.oid INTO v_seq_schema, v_seq_name, v_seq_oid FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname ||'.'|| c.relname = v_seq; IF v_seq_oid IS NOT NULL THEN v_seq_max := sequence_max_value(v_seq_oid); END IF; IF v_seq_max IS NOT NULL THEN v_seq_reset := format('SELECT setval(''%I.%I'', %L)', v_seq_schema, v_seq_name, v_seq_max); PERFORM gdb(p_debug, 'v_reset_seq: '||v_seq_reset); EXECUTE v_seq_reset; END IF; END LOOP; IF v_jobmon THEN PERFORM update_step(v_step_id, 'OK', 'Done'); END IF; END IF; IF v_jobmon THEN v_step_id := add_step(v_job_id,'Updating last_run in config table'); END IF; UPDATE refresh_config_table set last_run = CURRENT_TIMESTAMP WHERE dest_table = v_dest_table; IF v_jobmon THEN PERFORM update_step(v_step_id, 'OK','Last run was '||CURRENT_TIMESTAMP); END IF; IF v_jobmon THEN PERFORM close_job(v_job_id); END IF; EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; EXCEPTION WHEN QUERY_CANCELED THEN SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; EXECUTE format('SELECT %I.dblink_get_connections() @> ARRAY[%L]', v_dblink_schema, v_dblink_name) INTO v_link_exists; IF v_link_exists THEN EXECUTE format('SELECT %I.dblink_disconnect(%L)', v_dblink_schema, v_dblink_name); END IF; RAISE EXCEPTION '%', SQLERRM; WHEN OTHERS THEN SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; SELECT jobmon INTO v_jobmon FROM @extschema@.refresh_config_table WHERE dest_table = p_destination; v_jobmon := COALESCE(p_jobmon, v_jobmon); EXECUTE format('SELECT %I.dblink_get_connections() @> ARRAY[%L]', v_dblink_schema, v_dblink_name) INTO v_link_exists; IF v_link_exists THEN EXECUTE format('SELECT %I.dblink_disconnect(%L)', v_dblink_schema, v_dblink_name); END IF; IF v_jobmon AND v_jobmon_schema IS NOT NULL THEN IF v_job_id IS NULL THEN EXECUTE format('SELECT %I.add_job(%L)', v_jobmon_schema, 'Refresh Table: '||p_destination) INTO v_job_id; EXECUTE format('SELECT %I.add_step(%L, %L)', v_jobmon_schema, v_job_id, 'EXCEPTION before job logging started') INTO v_step_id; END IF; IF v_step_id IS NULL THEN EXECUTE format('SELECT %I.add_step(%L, %L)', v_jobmon_schema, v_job_id, 'EXCEPTION before first step logged') INTO v_step_id; END IF; EXECUTE format('SELECT %I.update_step(%L, %L, %L)', v_jobmon_schema, v_step_id, 'CRITICAL', 'ERROR: '||COALESCE(SQLERRM,'unknown')); EXECUTE format('SELECT %I.fail_job(%L)', v_jobmon_schema, v_job_id); END IF; RAISE EXCEPTION '%', SQLERRM; END $$;