-- Moved the attempt at taking an advisory lock to the earliest point possible in the refresh jobs to avoid edge case of overlapping jobs causing errors instead of exiting gracefully. -- Simplified exception blocks and made some error messages clearer in refresh functions. -- Added some simple exception block pgTAP tests. /* * Refresh based on DML (Insert, Update, Delete) */ CREATE OR REPLACE FUNCTION refresh_dml(p_destination text, p_limit int default NULL, p_repull boolean DEFAULT false, p_debug boolean DEFAULT false) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE v_adv_lock boolean; v_batch_limit_reached boolean := false; v_cols_n_types text; v_cols text; v_condition text; v_control text; v_dblink int; v_dblink_name text; v_dblink_schema text; v_delete_sql text; v_dest_table text; v_exec_status text; v_fetch_sql text; v_field text; v_filter text[]; v_insert_sql text; v_job_id int; v_jobmon_schema text; v_job_name text; v_limit int; v_link_exists boolean; v_old_search_path text; v_pk_counter int; v_pk_name_csv text; v_pk_name_type_csv text := ''; v_pk_name text[]; v_pk_type text[]; v_pk_where text := ''; v_remote_f_sql text; v_remote_q_sql text; v_rowcount bigint := 0; v_source_table text; v_step_id int; v_tmp_table text; v_total bigint := 0; v_trigger_delete text; v_trigger_update text; v_truncate_remote_q text; v_with_update text; BEGIN IF p_debug IS DISTINCT FROM true THEN PERFORM set_config( 'client_min_messages', 'warning', true ); END IF; v_job_name := 'Refresh DML: '||p_destination; v_dblink_name := 'mimeo_dml_refresh_'||p_destination; SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; -- Set custom search path to allow easier calls to other functions, especially job logging SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||',public'',''false'')'; -- Take advisory lock to prevent multiple calls to function overlapping v_adv_lock := pg_try_advisory_lock(hashtext('refresh_dml'), hashtext(v_job_name)); IF v_adv_lock = 'false' THEN v_job_id := add_job(v_job_name); v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name); PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name); PERFORM update_step(v_step_id, 'WARNING','Found concurrent job. Exiting gracefully'); PERFORM fail_job(v_job_id, 2); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RETURN; END IF; SELECT source_table , dest_table , 'tmp_'||replace(dest_table,'.','_') , dblink , control , pk_name , pk_type , filter , condition , batch_limit FROM refresh_config_dml WHERE dest_table = p_destination INTO v_source_table , v_dest_table , v_tmp_table , v_dblink , v_control , v_pk_name , v_pk_type , v_filter , v_condition , v_limit; IF NOT FOUND THEN RAISE EXCEPTION 'No configuration found for %',v_job_name; END IF; v_job_id := add_job(v_job_name); PERFORM gdb(p_debug,'Job ID: '||v_job_id::text); v_step_id := add_step(v_job_id,'Sanity check primary/unique key values'); IF v_pk_name IS NULL OR v_pk_type IS NULL THEN RAISE EXCEPTION 'Primary key fields in refresh_config_dml must be defined'; END IF; -- determine column list, column type list IF v_filter IS NULL THEN SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',') INTO v_cols, v_cols_n_types FROM pg_attribute WHERE attrelid = p_destination::regclass AND attnum > 0 AND attisdropped is false; ELSE -- ensure all primary key columns are included in any column filters FOREACH v_field IN ARRAY v_pk_name LOOP IF v_field = ANY(v_filter) THEN CONTINUE; ELSE RAISE EXCEPTION 'Filter list did not contain all columns that compose primary/unique key for %',v_job_name; END IF; END LOOP; SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',') INTO v_cols, v_cols_n_types FROM pg_attribute WHERE attrelid = p_destination::regclass AND ARRAY[attname::text] <@ v_filter AND attnum > 0 AND attisdropped is false; END IF; IF p_limit IS NOT NULL THEN v_limit := p_limit; END IF; v_pk_name_csv := array_to_string(v_pk_name, ','); v_pk_counter := 1; WHILE v_pk_counter <= array_length(v_pk_name,1) LOOP IF v_pk_counter > 1 THEN v_pk_name_type_csv := v_pk_name_type_csv || ', '; v_pk_where := v_pk_where ||' AND '; END IF; v_pk_name_type_csv := v_pk_name_type_csv ||v_pk_name[v_pk_counter]||' '||v_pk_type[v_pk_counter]; v_pk_where := v_pk_where || ' a.'||v_pk_name[v_pk_counter]||' = b.'||v_pk_name[v_pk_counter]; v_pk_counter := v_pk_counter + 1; END LOOP; PERFORM update_step(v_step_id, 'OK','Done'); PERFORM dblink_connect(v_dblink_name, auth(v_dblink)); -- update remote entries v_step_id := add_step(v_job_id,'Updating remote trigger table'); v_with_update := 'WITH a AS (SELECT '||v_pk_name_csv||' FROM '|| v_control ||' ORDER BY '||v_pk_name_csv||' LIMIT '|| COALESCE(v_limit::text, 'ALL') ||') UPDATE '||v_control||' b SET processed = true FROM a WHERE '||v_pk_where; v_trigger_update := 'SELECT dblink_exec('||quote_literal(v_dblink_name)||','|| quote_literal(v_with_update)||')'; PERFORM gdb(p_debug,v_trigger_update); EXECUTE v_trigger_update INTO v_exec_status; PERFORM update_step(v_step_id, 'OK','Result was '||v_exec_status); IF p_repull THEN -- Do truncate of remote queue table here before full data pull is actually started to ensure all new changes are recorded PERFORM update_step(v_step_id, 'OK','Request to repull ALL data from source. This could take a while...'); PERFORM gdb(p_debug, 'Request to repull ALL data from source. This could take a while...'); v_truncate_remote_q := 'SELECT dblink_exec('||quote_literal(v_dblink_name)||','||quote_literal('TRUNCATE TABLE '||v_control)||')'; EXECUTE v_truncate_remote_q; v_step_id := add_step(v_job_id,'Truncating local table'); PERFORM gdb(p_debug,'Truncating local table'); EXECUTE 'TRUNCATE '||v_dest_table; PERFORM update_step(v_step_id, 'OK','Done'); -- Define cursor query v_remote_f_sql := 'SELECT '||v_cols||' FROM '||v_source_table; IF v_condition IS NOT NULL THEN v_remote_f_sql := v_remote_f_sql || ' ' || v_condition; END IF; ELSE EXECUTE 'CREATE TEMP TABLE '||v_tmp_table||'_queue ('||v_pk_name_type_csv||', PRIMARY KEY ('||v_pk_name_csv||'))'; -- Copy queue locally for use in removing updated/deleted rows v_remote_q_sql := 'SELECT DISTINCT '||v_pk_name_csv||' FROM '||v_control||' WHERE processed = true'; PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_q_sql); v_step_id := add_step(v_job_id, 'Creating local queue temp table'); v_rowcount := 0; LOOP v_fetch_sql := 'INSERT INTO '||v_tmp_table||'_queue ('||v_pk_name_csv||') SELECT '||v_pk_name_csv||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||v_pk_name_type_csv||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; EXIT WHEN v_rowcount = 0; v_total := v_total + coalesce(v_rowcount, 0); PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.'); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); EXECUTE 'CREATE INDEX ON '||v_tmp_table||'_queue ('||v_pk_name_csv||')'; EXECUTE 'ANALYZE '||v_tmp_table||'_queue'; PERFORM update_step(v_step_id, 'OK','Number of rows inserted: '||v_total); PERFORM gdb(p_debug,'Temp queue table row count '||v_total::text); v_step_id := add_step(v_job_id,'Deleting records from local table'); v_delete_sql := 'DELETE FROM '||v_dest_table||' a USING '||v_tmp_table||'_queue b WHERE '|| v_pk_where; PERFORM gdb(p_debug,v_delete_sql); EXECUTE v_delete_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; PERFORM gdb(p_debug,'Rows removed from local table before applying changes: '||v_rowcount::text); PERFORM update_step(v_step_id, 'OK','Removed '||v_rowcount||' records'); -- Define cursor query v_remote_f_sql := 'SELECT '||v_cols||' FROM '||v_source_table||' JOIN ('||v_remote_q_sql||') x USING ('||v_pk_name_csv||')'; IF v_condition IS NOT NULL THEN v_remote_f_sql := v_remote_f_sql || ' ' || v_condition; END IF; END IF; -- insert records to local table. Have to do temp table in case destination table is partitioned (returns 0 when inserting to parent) PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_f_sql); v_step_id := add_step(v_job_id, 'Inserting new records into local table'); EXECUTE 'CREATE TEMP TABLE '||v_tmp_table||'_full ('||v_cols_n_types||')'; v_rowcount := 0; v_total := 0; LOOP v_fetch_sql := 'INSERT INTO '||v_tmp_table||'_full ('||v_cols||') SELECT '||v_cols||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||v_cols_n_types||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; v_total := v_total + coalesce(v_rowcount, 0); EXECUTE 'INSERT INTO '||v_dest_table||' ('||v_cols||') SELECT '||v_cols||' FROM '||v_tmp_table||'_full'; EXECUTE 'TRUNCATE '||v_tmp_table||'_full'; EXIT WHEN v_rowcount = 0; PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.'); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); PERFORM update_step(v_step_id, 'OK','Number of rows inserted: '||v_total); IF p_repull = false AND v_total > (v_limit * .75) THEN v_step_id := add_step(v_job_id, 'Row count warning'); PERFORM update_step(v_step_id, 'WARNING','Row count fetched ('||v_total||') greater than 75% of batch limit ('||v_limit||'). Recommend increasing batch limit if possible.'); v_batch_limit_reached := true; END IF; -- clean out rows from txn table v_step_id := add_step(v_job_id,'Cleaning out rows from txn table'); v_trigger_delete := 'SELECT dblink_exec('||quote_literal(v_dblink_name)||','||quote_literal('DELETE FROM '||v_control||' WHERE processed = true')||')'; PERFORM gdb(p_debug,v_trigger_delete); EXECUTE v_trigger_delete INTO v_exec_status; PERFORM update_step(v_step_id, 'OK','Result was '||v_exec_status); -- update activity status v_step_id := add_step(v_job_id,'Updating last_run in config table'); UPDATE refresh_config_dml SET last_run = CURRENT_TIMESTAMP WHERE dest_table = p_destination; PERFORM update_step(v_step_id, 'OK','Last run was '||CURRENT_TIMESTAMP); EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_full'; EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_queue'; PERFORM dblink_disconnect(v_dblink_name); IF v_batch_limit_reached = false THEN PERFORM close_job(v_job_id); ELSE -- Set final job status to level 2 (WARNING) to bring notice that the batch limit was reached and may need adjusting. -- Preventive warning to keep replication from falling behind. PERFORM fail_job(v_job_id, 2); END IF; -- Ensure old search path is reset for the current session EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_dml'), hashtext(v_job_name)); EXCEPTION WHEN QUERY_CANCELED THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; PERFORM pg_advisory_unlock(hashtext('refresh_dml'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; WHEN OTHERS THEN IF v_job_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_job(''Refresh DML: '||p_destination||''')' INTO v_job_id; EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before job logging started'')' INTO v_step_id; END IF; IF v_step_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before first step logged'')' INTO v_step_id; END IF; EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; EXECUTE 'SELECT '||v_jobmon_schema||'.update_step('||v_step_id||', ''CRITICAL'', ''ERROR: '||coalesce(SQLERRM,'unknown')||''')'; EXECUTE 'SELECT '||v_jobmon_schema||'.fail_job('||v_job_id||')'; PERFORM pg_advisory_unlock(hashtext('refresh_dml'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; END $$; /* * Refresh insert only table based on timestamp control field */ CREATE OR REPLACE FUNCTION refresh_inserter(p_destination text, p_limit integer DEFAULT NULL, p_repull boolean DEFAULT false, p_repull_start text DEFAULT NULL, p_repull_end text DEFAULT NULL, p_debug boolean DEFAULT false) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE v_adv_lock boolean; v_batch_limit_reached int := 0; v_boundary timestamptz; v_cols_n_types text; v_cols text; v_condition text; v_control text; v_create_sql text; v_dblink int; v_dblink_name text; v_dblink_schema text; v_delete_sql text; v_dest_table text; v_dst_active boolean; v_dst_check boolean; v_dst_start int; v_dst_end int; v_fetch_sql text; v_filter text[]; v_full_refresh boolean := false; v_insert_sql text; v_job_id int; v_jobmon_schema text; v_job_name text; v_last_fetched timestamptz; v_last_value timestamptz; v_limit int; v_link_exists boolean; v_now timestamptz := now(); v_old_search_path text; v_remote_sql text; v_rowcount bigint := 0; v_source_table text; v_step_id int; v_tmp_table text; v_total bigint := 0; BEGIN IF p_debug IS DISTINCT FROM true THEN PERFORM set_config( 'client_min_messages', 'warning', true ); END IF; v_job_name := 'Refresh Inserter: '||p_destination; v_dblink_name := 'mimeo_inserter_refresh_'||p_destination; SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; -- Set custom search path to allow easier calls to other functions, especially job logging SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||',public'',''false'')'; -- Take advisory lock to prevent multiple calls to function overlapping v_adv_lock := pg_try_advisory_lock(hashtext('refresh_inserter'), hashtext(v_job_name)); IF v_adv_lock = 'false' THEN v_job_id := add_job(v_job_name); v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name); PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name); PERFORM update_step(v_step_id, 'WARNING','Found concurrent job. Exiting gracefully'); PERFORM fail_job(v_job_id, 2); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RETURN; END IF; v_job_id := add_job(v_job_name); PERFORM gdb(p_debug,'Job ID: '||v_job_id::text); SELECT source_table , dest_table , 'tmp_'||replace(dest_table,'.','_') , dblink , control , last_value , now() - boundary::interval , filter , condition , dst_active , dst_start , dst_end , batch_limit FROM refresh_config_inserter WHERE dest_table = p_destination INTO v_source_table , v_dest_table , v_tmp_table , v_dblink , v_control , v_last_value , v_boundary , v_filter , v_condition , v_dst_active , v_dst_start , v_dst_end , v_limit; IF NOT FOUND THEN RAISE EXCEPTION 'No configuration found for %',v_job_name; END IF; -- Do not allow this function to run during DST time change if config option is true. Otherwise will miss data from source IF v_dst_active THEN v_dst_check := @extschema@.dst_change(v_now); IF v_dst_check THEN IF to_number(to_char(v_now, 'HH24MM'), '0000') > v_dst_start AND to_number(to_char(v_now, 'HH24MM'), '0000') < v_dst_end THEN v_step_id := add_step( v_job_id, 'DST Check'); PERFORM update_step(v_step_id, 'OK', 'Job CANCELLED - Does not run during DST time change'); PERFORM close_job(v_job_id); PERFORM gdb(p_debug, 'Cannot run during DST time change'); UPDATE refresh_config_inserter SET last_run = CURRENT_TIMESTAMP WHERE dest_table = p_destination; EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_inserter'), hashtext(v_job_name)); RETURN; END IF; END IF; END IF; v_step_id := add_step(v_job_id,'Building SQL'); IF v_filter IS NULL THEN SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',') INTO v_cols, v_cols_n_types FROM pg_attribute WHERE attrelid = p_destination::regclass AND attnum > 0 AND attisdropped is false; ELSE SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',') INTO v_cols, v_cols_n_types FROM pg_attribute WHERE attrelid = p_destination::regclass AND ARRAY[attname::text] <@ v_filter AND attnum > 0 AND attisdropped is false; END IF; PERFORM dblink_connect(v_dblink_name, auth(v_dblink)); IF p_limit IS NOT NULL THEN v_limit := p_limit; END IF; IF p_repull THEN -- Repull ALL data if no start and end values set IF p_repull_start IS NULL AND p_repull_end IS NULL THEN PERFORM update_step(v_step_id, 'OK','Request to repull ALL data from source. This could take a while...'); EXECUTE 'TRUNCATE '||v_dest_table; v_remote_sql := 'SELECT '||v_cols||' FROM '||v_source_table; IF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition; END IF; ELSE PERFORM update_step(v_step_id, 'OK','Request to repull data from '||COALESCE(p_repull_start, '-infinity')||' to '||COALESCE(p_repull_end, 'infinity')); PERFORM gdb(p_debug,'Request to repull data from '||COALESCE(p_repull_start, '-infinity')||' to '||COALESCE(p_repull_end, 'infinity')); v_remote_sql := 'SELECT '||v_cols||' FROM '||v_source_table; IF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition || ' AND '; ELSE v_remote_sql := v_remote_sql || ' WHERE '; END IF; v_remote_sql := v_remote_sql ||v_control||' > '||quote_literal(COALESCE(p_repull_start, '-infinity'))||' AND ' ||v_control||' < '||quote_literal(COALESCE(p_repull_end, 'infinity')); -- Delete the old local data v_delete_sql := 'DELETE FROM '||v_dest_table||' WHERE '||v_control||' > '||quote_literal(COALESCE(p_repull_start, '-infinity'))||' AND ' ||v_control||' < '||quote_literal(COALESCE(p_repull_end, 'infinity')); v_step_id := add_step(v_job_id, 'Deleting current, local data'); PERFORM gdb(p_debug,'Deleting current, local data: '||v_delete_sql); EXECUTE v_delete_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; PERFORM update_step(v_step_id, 'OK', v_rowcount || 'rows removed'); END IF; ELSE -- does < for upper boundary to keep missing data from happening on rare edge case where a newly inserted row outside the transaction batch -- has the exact same timestamp as the previous batch's max timestamp v_remote_sql := 'SELECT '||v_cols||' FROM '||v_source_table; IF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition || ' AND '; ELSE v_remote_sql := v_remote_sql || ' WHERE '; END IF; v_remote_sql := v_remote_sql ||v_control||' > '||quote_literal(v_last_value)||' AND '||v_control||' < '||quote_literal(v_boundary)||' ORDER BY '||v_control||' ASC LIMIT '|| COALESCE(v_limit::text, 'ALL'); PERFORM update_step(v_step_id, 'OK','Grabbing rows from '||v_last_value::text||' to '||v_boundary::text); PERFORM gdb(p_debug,'Grabbing rows from '||v_last_value::text||' to '||v_boundary::text); END IF; EXECUTE 'CREATE TEMP TABLE '||v_tmp_table||' ('||v_cols_n_types||')'; PERFORM gdb(p_debug,v_remote_sql); PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_sql); v_step_id := add_step(v_job_id, 'Inserting new records into local table'); v_rowcount := 0; v_total := 0; LOOP v_fetch_sql := 'INSERT INTO '||v_tmp_table||'('||v_cols||') SELECT '||v_cols||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||v_cols_n_types||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; v_total := v_total + coalesce(v_rowcount, 0); EXECUTE 'SELECT max('||v_control||') FROM '||v_tmp_table INTO v_last_fetched; IF v_limit IS NULL THEN -- insert into the real table in batches if no limit to avoid excessively large temp tables EXECUTE 'INSERT INTO '||v_dest_table||' ('||v_cols||') SELECT '||v_cols||' FROM '||v_tmp_table; EXECUTE 'TRUNCATE '||v_tmp_table; END IF; EXIT WHEN v_rowcount = 0; PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far. Last fetched: '||v_last_fetched); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far. Last fetched: '||v_last_fetched); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); PERFORM update_step(v_step_id, 'OK','Rows fetched: '||v_total); IF v_limit IS NULL THEN -- nothing else to do ELSE -- When using batch limits, entire batch must be pulled to temp table before inserting to real table to catch edge cases v_step_id := add_step(v_job_id,'Checking for batch limit issues'); PERFORM gdb(p_debug, 'Checking for batch limit issues'); -- Not recommended that the batch actually equal the limit set if possible. Handle all edge cases to keep data consistent IF v_total >= v_limit THEN PERFORM update_step(v_step_id, 'WARNING','Row count fetched equal to or greater than limit set: '||v_limit||'. Recommend increasing batch limit if possible.'); PERFORM gdb(p_debug, 'Row count fetched equal to or greater than limit set: '||v_limit||'. Recommend increasing batch limit if possible.'); EXECUTE 'SELECT max('||v_control||') FROM '||v_tmp_table INTO v_last_value; v_step_id := add_step(v_job_id, 'Removing high boundary rows from this batch to avoid missing data'); EXECUTE 'DELETE FROM '||v_tmp_table||' WHERE '||v_control||' = '||quote_literal(v_last_value); GET DIAGNOSTICS v_rowcount = ROW_COUNT; PERFORM update_step(v_step_id, 'OK', 'Removed '||v_rowcount||' rows. Batch now contains '||v_limit - v_rowcount||' records'); PERFORM gdb(p_debug, 'Removed '||v_rowcount||' rows from batch. Batch table now contains '||v_limit - v_rowcount||' records'); v_batch_limit_reached = 2; v_total := v_total - v_rowcount; IF (v_limit - v_rowcount) < 1 THEN v_step_id := add_step(v_job_id, 'Reached inconsistent state'); PERFORM update_step(v_step_id, 'CRITICAL', 'Batch contained max rows ('||v_limit||') or greater and all contained the same timestamp value. Unable to guarentee rows will ever be replicated consistently. Increase row limit parameter to allow a consistent batch.'); PERFORM gdb(p_debug, 'Batch contained max rows desired ('||v_limit||') or greaer and all contained the same timestamp value. Unable to guarentee rows will be replicated consistently. Increase row limit parameter to allow a consistent batch.'); v_batch_limit_reached = 3; END IF; ELSE PERFORM update_step(v_step_id, 'OK','No issues found'); PERFORM gdb(p_debug, 'No issues found'); END IF; IF v_batch_limit_reached <> 3 THEN v_step_id := add_step(v_job_id,'Inserting new records into local table'); EXECUTE 'INSERT INTO '||v_dest_table||' ('||v_cols||') SELECT '||v_cols||' FROM '||v_tmp_table; PERFORM update_step(v_step_id, 'OK','Inserted '||v_total||' records'); PERFORM gdb(p_debug, 'Inserted '||v_total||' records'); END IF; END IF; -- end v_limit IF IF v_batch_limit_reached <> 3 THEN v_step_id := add_step(v_job_id, 'Setting next lower boundary'); EXECUTE 'SELECT max('||v_control||') FROM '|| v_dest_table INTO v_last_value; UPDATE refresh_config_inserter set last_value = coalesce(v_last_value, CURRENT_TIMESTAMP), last_run = CURRENT_TIMESTAMP WHERE dest_table = p_destination; PERFORM update_step(v_step_id, 'OK','Lower boundary value is: '|| coalesce(v_last_value, CURRENT_TIMESTAMP)); PERFORM gdb(p_debug, 'Lower boundary value is: '||coalesce(v_last_value, CURRENT_TIMESTAMP)); END IF; EXECUTE 'DROP TABLE IF EXISTS ' || v_tmp_table; PERFORM dblink_disconnect(v_dblink_name); IF v_batch_limit_reached = 0 THEN PERFORM close_job(v_job_id); ELSIF v_batch_limit_reached = 2 THEN -- Set final job status to level 2 (WARNING) to bring notice that the batch limit was reached and may need adjusting. -- Preventive warning to keep replication from falling behind. PERFORM fail_job(v_job_id, 2); ELSIF v_batch_limit_reached = 3 THEN -- Really bad. Critical alert! PERFORM fail_job(v_job_id); END IF; -- Ensure old search path is reset for the current session EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_inserter'), hashtext(v_job_name)); EXCEPTION WHEN QUERY_CANCELED THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; PERFORM pg_advisory_unlock(hashtext('refresh_inserter'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; WHEN OTHERS THEN IF v_job_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_job(''Refresh Inserter: '||p_destination||''')' INTO v_job_id; EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before job logging started'')' INTO v_step_id; END IF; IF v_step_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before first step logged'')' INTO v_step_id; END IF; EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; EXECUTE 'SELECT '||v_jobmon_schema||'.update_step('||v_step_id||', ''CRITICAL'', ''ERROR: '||COALESCE(SQLERRM,'unknown')||''')'; EXECUTE 'SELECT '||v_jobmon_schema||'.fail_job('||v_job_id||')'; PERFORM pg_advisory_unlock(hashtext('refresh_inserter'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; END $$; /* * Refresh based on DML (Insert, Update, Delete), but logs all deletes on the destination table * Destination table requires extra column: mimeo_source_deleted timestamptz */ CREATE OR REPLACE FUNCTION refresh_logdel(p_destination text, p_limit int DEFAULT NULL, p_repull boolean DEFAULT false, p_debug boolean DEFAULT false) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE v_adv_lock boolean; v_batch_limit_reached boolean := false; v_cols_n_types text; v_cols text; v_condition text; v_control text; v_dblink int; v_dblink_name text; v_dblink_schema text; v_delete_d_sql text; v_delete_f_sql text; v_dest_table text; v_exec_status text; v_fetch_sql text; v_field text; v_filter text[]; v_insert_deleted_sql text; v_insert_sql text; v_job_id int; v_jobmon_schema text; v_job_name text; v_limit int; v_link_exists boolean; v_old_search_path text; v_pk_counter int; v_pk_name text[]; v_pk_name_csv text; v_pk_name_type_csv text := ''; v_pk_type text[]; v_pk_where text := ''; v_remote_d_sql text; v_remote_f_sql text; v_remote_q_sql text; v_rowcount bigint := 0; v_source_table text; v_step_id int; v_tmp_table text; v_total bigint := 0; v_trigger_delete text; v_trigger_update text; v_truncate_remote_q text; v_with_update text; BEGIN IF p_debug IS DISTINCT FROM true THEN PERFORM set_config( 'client_min_messages', 'warning', true ); END IF; v_job_name := 'Refresh Log Del: '||p_destination; v_dblink_name := 'mimeo_logdel_refresh_'||p_destination; SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; -- Set custom search path to allow easier calls to other functions, especially job logging SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||',public'',''false'')'; -- Take advisory lock to prevent multiple calls to function overlapping v_adv_lock := pg_try_advisory_lock(hashtext('refresh_logdel'), hashtext(v_job_name)); IF v_adv_lock = 'false' THEN v_job_id := add_job(v_job_name); v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name); PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name); PERFORM update_step(v_step_id, 'WARNING','Found concurrent job. Exiting gracefully'); PERFORM fail_job(v_job_id, 2); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RETURN; END IF; SELECT source_table , dest_table , 'tmp_'||replace(dest_table,'.','_') , dblink , control , pk_name , pk_type , filter , condition , batch_limit FROM refresh_config_logdel WHERE dest_table = p_destination INTO v_source_table , v_dest_table , v_tmp_table , v_dblink , v_control , v_pk_name , v_pk_type , v_filter , v_condition , v_limit; IF NOT FOUND THEN RAISE EXCEPTION 'No configuration found for %',v_job_name; END IF; v_job_id := add_job(v_job_name); PERFORM gdb(p_debug,'Job ID: '||v_job_id::text); v_step_id := add_step(v_job_id,'Sanity check primary/unique key values'); IF v_pk_name IS NULL OR v_pk_type IS NULL THEN RAISE EXCEPTION 'Primary key fields in refresh_config_logdel must be defined'; END IF; -- determine column list, column type list IF v_filter IS NULL THEN SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',') INTO v_cols, v_cols_n_types FROM pg_attribute WHERE attnum > 0 AND attisdropped is false AND attrelid = p_destination::regclass AND attname != 'mimeo_source_deleted'; ELSE -- ensure all primary key columns are included in any column filters FOREACH v_field IN ARRAY v_pk_name LOOP IF v_field = ANY(v_filter) THEN CONTINUE; ELSE RAISE EXCEPTION 'Filter list did not contain all columns that compose primary key for %',v_job_name; END IF; END LOOP; SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',') INTO v_cols, v_cols_n_types FROM pg_attribute WHERE attrelid = p_destination::regclass AND ARRAY[attname::text] <@ v_filter AND attnum > 0 AND attisdropped is false AND attname != 'mimeo_source_deleted' ; END IF; IF p_limit IS NOT NULL THEN v_limit := p_limit; END IF; v_pk_name_csv := array_to_string(v_pk_name,','); v_pk_counter := 1; WHILE v_pk_counter <= array_length(v_pk_name,1) LOOP IF v_pk_counter > 1 THEN v_pk_name_type_csv := v_pk_name_type_csv || ', '; v_pk_where := v_pk_where ||' AND '; END IF; v_pk_name_type_csv := v_pk_name_type_csv ||v_pk_name[v_pk_counter]||' '||v_pk_type[v_pk_counter]; v_pk_where := v_pk_where || ' a.'||v_pk_name[v_pk_counter]||' = b.'||v_pk_name[v_pk_counter]; v_pk_counter := v_pk_counter + 1; END LOOP; PERFORM update_step(v_step_id, 'OK','Done'); PERFORM dblink_connect(v_dblink_name, auth(v_dblink)); -- update remote entries v_step_id := add_step(v_job_id,'Updating remote trigger table'); v_with_update := 'WITH a AS (SELECT '||v_pk_name_csv||' FROM '|| v_control ||' ORDER BY '||v_pk_name_csv||' LIMIT '|| COALESCE(v_limit::text, 'ALL') ||') UPDATE '||v_control||' b SET processed = true FROM a WHERE '|| v_pk_where; v_trigger_update := 'SELECT dblink_exec('||quote_literal(v_dblink_name)||','|| quote_literal(v_with_update)||')'; PERFORM gdb(p_debug,v_trigger_update); EXECUTE v_trigger_update INTO v_exec_status; PERFORM update_step(v_step_id, 'OK','Result was '||v_exec_status); -- create temp table for recording deleted rows EXECUTE 'CREATE TEMP TABLE '||v_tmp_table||'_deleted ('||v_cols_n_types||', mimeo_source_deleted timestamptz)'; v_remote_d_sql := 'SELECT '||v_cols||', mimeo_source_deleted FROM '||v_control||' WHERE processed = true and mimeo_source_deleted IS NOT NULL'; PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_d_sql); v_step_id := add_step(v_job_id, 'Creating local queue temp table for deleted rows on source'); v_rowcount := 0; v_total := 0; LOOP v_fetch_sql := 'INSERT INTO '||v_tmp_table||'_deleted ('||v_cols||', mimeo_source_deleted) SELECT '||v_cols||', mimeo_source_deleted FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||v_cols_n_types||', mimeo_source_deleted timestamptz)'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; EXIT WHEN v_rowcount = 0; v_total := v_total + coalesce(v_rowcount, 0); PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.'); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); EXECUTE 'CREATE INDEX ON '||v_tmp_table||'_deleted ('||v_pk_name_csv||')'; EXECUTE 'ANALYZE '||v_tmp_table||'_deleted'; PERFORM update_step(v_step_id, 'OK','Number of rows inserted: '||v_total); PERFORM gdb(p_debug,'Temp deleted queue table row count '||v_total::text); IF p_repull THEN -- Do delete instead of truncate like refresh_dml to avoid missing rows between the above deleted queue fetch and here. PERFORM update_step(v_step_id, 'OK','Request to repull ALL data from source. This could take a while...'); PERFORM gdb(p_debug, 'Request to repull ALL data from source. This could take a while...'); v_truncate_remote_q := 'SELECT dblink_exec('||quote_literal(v_dblink_name)||','||quote_literal('DELETE FROM '||v_control||' WHERE processed = true')||')'; PERFORM gdb(p_debug, v_truncate_remote_q); EXECUTE v_truncate_remote_q; v_step_id := add_step(v_job_id,'Removing local, undeleted rows'); PERFORM gdb(p_debug,'Removing local, undeleted rows'); EXECUTE 'DELETE FROM '||v_dest_table||' WHERE mimeo_source_deleted IS NULL'; PERFORM update_step(v_step_id, 'OK','Done'); -- Define cursor query v_remote_f_sql := 'SELECT '||v_cols||' FROM '||v_source_table; IF v_condition IS NOT NULL THEN v_remote_f_sql := v_remote_f_sql || ' ' || v_condition; END IF; ELSE -- Do normal stuff here EXECUTE 'CREATE TEMP TABLE '||v_tmp_table||'_queue ('||v_pk_name_type_csv||')'; v_remote_q_sql := 'SELECT DISTINCT '||v_pk_name_csv||' FROM '||v_control||' WHERE processed = true and mimeo_source_deleted IS NULL'; PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_q_sql); v_step_id := add_step(v_job_id, 'Creating local queue temp table for inserts/updates'); v_rowcount := 0; LOOP v_fetch_sql := 'INSERT INTO '||v_tmp_table||'_queue ('||v_pk_name_csv||') SELECT '||v_pk_name_csv||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||v_pk_name_type_csv||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; EXIT WHEN v_rowcount = 0; v_total := v_total + coalesce(v_rowcount, 0); PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.'); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); EXECUTE 'CREATE INDEX ON '||v_tmp_table||'_queue ('||v_pk_name_csv||')'; EXECUTE 'ANALYZE '||v_tmp_table||'_queue'; PERFORM update_step(v_step_id, 'OK','Number of rows inserted: '||v_total); PERFORM gdb(p_debug,'Temp inserts/updates queue table row count '||v_total::text); -- remove records from local table (inserts/updates) v_step_id := add_step(v_job_id,'Removing insert/update records from local table'); v_delete_f_sql := 'DELETE FROM '||v_dest_table||' a USING '||v_tmp_table||'_queue b WHERE '|| v_pk_where; PERFORM gdb(p_debug,v_delete_f_sql); EXECUTE v_delete_f_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; PERFORM gdb(p_debug,'Insert/Update rows removed from local table before applying changes: '||v_rowcount::text); PERFORM update_step(v_step_id, 'OK','Removed '||v_rowcount||' records'); -- remove records from local table (deleted rows) v_step_id := add_step(v_job_id,'Removing deleted records from local table'); v_delete_d_sql := 'DELETE FROM '||v_dest_table||' a USING '||v_tmp_table||'_deleted b WHERE '|| v_pk_where; PERFORM gdb(p_debug,v_delete_d_sql); EXECUTE v_delete_d_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; PERFORM gdb(p_debug,'Deleted rows removed from local table before applying changes: '||v_rowcount::text); PERFORM update_step(v_step_id, 'OK','Removed '||v_rowcount||' records'); -- Remote full query for normal replication v_remote_f_sql := 'SELECT '||v_cols||' FROM '||v_source_table||' JOIN ('||v_remote_q_sql||') x USING ('||v_pk_name_csv||')'; IF v_condition IS NOT NULL THEN v_remote_f_sql := v_remote_f_sql || ' ' || v_condition; END IF; END IF; -- insert records to local table (inserts/updates). Have to do temp table in case destination table is partitioned (returns 0 when inserting to parent) PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_f_sql); v_step_id := add_step(v_job_id, 'Inserting new/updated records into local table'); EXECUTE 'CREATE TEMP TABLE '||v_tmp_table||'_full ('||v_cols_n_types||')'; v_rowcount := 0; v_total := 0; LOOP v_fetch_sql := 'INSERT INTO '||v_tmp_table||'_full ('||v_cols||') SELECT '||v_cols||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||v_cols_n_types||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; v_total := v_total + coalesce(v_rowcount, 0); EXECUTE 'INSERT INTO '||v_dest_table||' ('||v_cols||') SELECT '||v_cols||' FROM '||v_tmp_table||'_full'; EXECUTE 'TRUNCATE '||v_tmp_table||'_full'; EXIT WHEN v_rowcount = 0; PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.'); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); PERFORM update_step(v_step_id, 'OK','New/updated rows inserted: '||v_total); -- insert records to local table (deleted rows to be kept) v_step_id := add_step(v_job_id,'Inserting deleted records into local table'); v_insert_deleted_sql := 'INSERT INTO '||v_dest_table||'('||v_cols||', mimeo_source_deleted) SELECT '||v_cols||', mimeo_source_deleted FROM '||v_tmp_table||'_deleted'; PERFORM gdb(p_debug,v_insert_deleted_sql); EXECUTE v_insert_deleted_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; PERFORM gdb(p_debug,'Deleted rows inserted: '||v_rowcount::text); PERFORM update_step(v_step_id, 'OK','Inserted '||v_rowcount||' records'); IF (v_total + v_rowcount) > (v_limit * .75) THEN v_step_id := add_step(v_job_id, 'Row count warning'); PERFORM update_step(v_step_id, 'WARNING','Row count fetched ('||v_total||') greater than 75% of batch limit ('||v_limit||'). Recommend increasing batch limit if possible.'); v_batch_limit_reached := true; END IF; -- clean out rows from txn table v_step_id := add_step(v_job_id,'Cleaning out rows from txn table'); v_trigger_delete := 'SELECT dblink_exec('||quote_literal(v_dblink_name)||','||quote_literal('DELETE FROM '||v_control||' WHERE processed = true')||')'; PERFORM gdb(p_debug,v_trigger_delete); EXECUTE v_trigger_delete INTO v_exec_status; PERFORM update_step(v_step_id, 'OK','Result was '||v_exec_status); -- update activity status v_step_id := add_step(v_job_id,'Updating last_run in config table'); UPDATE refresh_config_logdel SET last_run = CURRENT_TIMESTAMP WHERE dest_table = p_destination; PERFORM update_step(v_step_id, 'OK','Last Value was '||current_timestamp); PERFORM dblink_disconnect(v_dblink_name); EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_full'; EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_queue'; EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_deleted'; IF v_batch_limit_reached = false THEN PERFORM close_job(v_job_id); ELSE -- Set final job status to level 2 (WARNING) to bring notice that the batch limit was reached and may need adjusting. -- Preventive warning to keep replication from falling behind. PERFORM fail_job(v_job_id, 2); END IF; -- Ensure old search path is reset for the current session EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_logdel'), hashtext(v_job_name)); EXCEPTION WHEN QUERY_CANCELED THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; PERFORM pg_advisory_unlock(hashtext('refresh_logdel'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; WHEN OTHERS THEN IF v_job_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_job(''Refresh Log Del: '||p_destination||''')' INTO v_job_id; EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before job logging started'')' INTO v_step_id; END IF; IF v_step_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before first step logged'')' INTO v_step_id; END IF; EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; EXECUTE 'SELECT '||v_jobmon_schema||'.update_step('||v_step_id||', ''CRITICAL'', ''ERROR: '||coalesce(SQLERRM,'unknown')||''')'; EXECUTE 'SELECT '||v_jobmon_schema||'.fail_job('||v_job_id||')'; PERFORM pg_advisory_unlock(hashtext('refresh_logdel'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; END $$; /* * Snap refresh to repull all table data */ CREATE OR REPLACE FUNCTION refresh_snap(p_destination text, p_index boolean DEFAULT true, p_debug boolean DEFAULT false, p_pulldata boolean DEFAULT true) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE v_adv_lock boolean; v_cols_n_types text[]; v_cols text[]; v_condition text; v_create_sql text; v_dblink int; v_dblink_name text; v_dblink_schema text; v_dest_table text; v_exists int; v_fetch_sql text; v_filter text[]; v_insert_sql text; v_job_id int; v_jobmon_schema text; v_job_name text; v_lcols_array text[]; v_link_exists boolean; v_local_sql text; v_l text; v_match boolean = true; v_old_grant record; v_old_owner text; v_old_search_path text; v_old_snap text; v_old_snap_table text; v_parts record; v_post_script text[]; v_refresh_snap text; v_remote_index_sql text; v_remote_sql text; v_row record; v_rowcount bigint; v_r text; v_snap text; v_source_table text; v_step_id int; v_table_exists boolean; v_total bigint := 0; v_tup_del bigint; v_tup_ins bigint; v_tup_upd bigint; v_tup_del_new bigint; v_tup_ins_new bigint; v_tup_upd_new bigint; v_view_definition text; BEGIN IF p_debug IS DISTINCT FROM true THEN PERFORM set_config( 'client_min_messages', 'notice', true ); END IF; v_job_name := 'Refresh Snap: '||p_destination; v_dblink_name := 'mimeo_snap_refresh_'||p_destination; SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; -- Set custom search path to allow easier calls to other functions, especially job logging SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||',public'',''false'')'; -- Take advisory lock to prevent multiple calls to function overlapping and causing possible deadlock v_adv_lock := pg_try_advisory_lock(hashtext('refresh_snap'), hashtext(v_job_name)); IF v_adv_lock = 'false' THEN v_job_id := add_job(v_job_name); v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name); PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name); PERFORM update_step(v_step_id, 'WARNING','Found concurrent job. Exiting gracefully'); PERFORM fail_job(v_job_id, 2); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RETURN; END IF; v_job_id := add_job(v_job_name); PERFORM gdb(p_debug,'Job ID: '||v_job_id::text); v_step_id := add_step(v_job_id,'Grabbing Mapping, Building SQL'); SELECT source_table , dest_table , dblink , filter , condition , n_tup_ins , n_tup_upd , n_tup_del , post_script INTO v_source_table , v_dest_table , v_dblink , v_filter , v_condition , v_tup_ins , v_tup_upd , v_tup_del , v_post_script FROM refresh_config_snap WHERE dest_table = p_destination; IF NOT FOUND THEN RAISE EXCEPTION 'No configuration found for %',v_job_name; END IF; -- checking for current view SELECT definition INTO v_view_definition FROM pg_views where ((schemaname || '.') || viewname)=v_dest_table; PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink)); PERFORM update_step(v_step_id, 'OK','Done'); v_step_id := add_step(v_job_id,'Truncate non-active snap table'); v_exists := strpos(v_view_definition, 'snap1'); IF v_exists > 0 THEN v_snap := 'snap2'; v_old_snap := 'snap1'; ELSE v_snap := 'snap1'; v_old_snap := 'snap2'; END IF; v_refresh_snap := v_dest_table||'_'||v_snap; v_old_snap_table := v_dest_table||'_'||v_old_snap; PERFORM gdb(p_debug,'v_refresh_snap: '||v_refresh_snap::text); -- Create snap table if it doesn't exist PERFORM gdb(p_debug, 'Getting table columns and creating destination table if it doesn''t exist'); SELECT p_table_exists, p_cols, p_cols_n_types FROM manage_dest_table(v_dest_table, v_snap, p_debug) INTO v_table_exists, v_cols, v_cols_n_types; IF v_table_exists THEN /* Check local column definitions against remote and recreate table if different. Allows automatic recreation of snap tables if columns change (add, drop type change) */ v_local_sql := 'SELECT array_agg(attname||'' ''||format_type(atttypid, atttypmod)::text) as cols_n_types FROM pg_attribute WHERE attnum > 0 AND attisdropped is false AND attrelid = ' || quote_literal(v_refresh_snap) || '::regclass'; PERFORM gdb(p_debug, v_local_sql); EXECUTE v_local_sql INTO v_lcols_array; -- Check to see if there's a change in the column structure on the remote FOREACH v_r IN ARRAY v_cols_n_types LOOP v_match := false; FOREACH v_l IN ARRAY v_lcols_array LOOP IF v_r = v_l THEN v_match := true; EXIT; END IF; END LOOP; END LOOP; IF v_match = false THEN -- Grab old table & view privileges. They are applied later after the view is recreated/swapped CREATE TEMP TABLE mimeo_snapshot_grants_tmp (statement text); FOR v_old_grant IN SELECT table_schema ||'.'|| table_name AS tablename , array_agg(privilege_type::text) AS types , grantee FROM information_schema.table_privileges WHERE table_schema ||'.'|| table_name IN (v_refresh_snap, v_dest_table) GROUP BY grantee, table_schema, table_name LOOP INSERT INTO mimeo_snapshot_grants_tmp VALUES ( 'GRANT '||array_to_string(v_old_grant.types, ',')||' ON '||v_old_grant.tablename||' TO '||v_old_grant.grantee ); END LOOP; SELECT viewowner INTO v_old_owner FROM pg_views WHERE schemaname ||'.'|| viewname = v_dest_table; EXECUTE 'DROP TABLE ' || v_refresh_snap; EXECUTE 'DROP VIEW ' || v_dest_table; PERFORM manage_dest_table(v_dest_table, v_snap, p_debug); v_step_id := add_step(v_job_id,'Source table structure changed.'); PERFORM update_step(v_step_id, 'OK','Tables and view dropped and recreated. Please double-check snap table attributes (permissions, indexes, etc'); PERFORM gdb(p_debug,'Source table structure changed. Tables and view dropped and recreated. Please double-check snap table attributes (permissions, indexes, etc)'); END IF; -- truncate non-active snap table EXECUTE 'TRUNCATE TABLE ' || v_refresh_snap; PERFORM update_step(v_step_id, 'OK','Done'); END IF; -- Only check the remote data if there have been no column changes and snap table actually exists. -- Otherwise maker functions won't work if source is empty & view switch won't happen properly. IF v_table_exists AND v_match THEN v_remote_sql := 'SELECT n_tup_ins, n_tup_upd, n_tup_del FROM pg_catalog.pg_stat_all_tables WHERE relid::regclass = '||quote_literal(v_source_table)||'::regclass'; v_remote_sql := 'SELECT n_tup_ins, n_tup_upd, n_tup_del FROM dblink('||quote_literal(v_dblink_name)||', ' || quote_literal(v_remote_sql) || ') t (n_tup_ins bigint, n_tup_upd bigint, n_tup_del bigint)'; perform gdb(p_debug,'v_remote_sql: '||v_remote_sql); EXECUTE v_remote_sql INTO v_tup_ins_new, v_tup_upd_new, v_tup_del_new; IF v_tup_ins_new = v_tup_ins AND v_tup_upd_new = v_tup_upd AND v_tup_del_new = v_tup_del THEN PERFORM gdb(p_debug,'Remote table has not had any writes. Skipping data pull'); PERFORM update_step(v_step_id, 'OK', 'Remote table has not had any writes. Skipping data pull'); UPDATE refresh_config_snap SET last_run = CURRENT_TIMESTAMP WHERE dest_table = p_destination; PERFORM dblink_disconnect(v_dblink_name); PERFORM close_job(v_job_id); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); RETURN; END IF; END IF; v_remote_sql := 'SELECT '|| array_to_string(v_cols, ',') ||' FROM '||v_source_table; -- Used by p_pulldata parameter in maker function IF p_pulldata = false THEN v_remote_sql := v_remote_sql || ' LIMIT 0'; ELSIF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition; END IF; v_step_id := add_step(v_job_id,'Inserting records into local table'); PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_sql); v_rowcount := 0; LOOP v_fetch_sql := 'INSERT INTO '|| v_refresh_snap ||' ('|| array_to_string(v_cols, ',') ||') SELECT '||array_to_string(v_cols, ',')||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||array_to_string(v_cols_n_types, ',')||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; EXIT WHEN v_rowcount = 0; v_total := v_total + coalesce(v_rowcount, 0); PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.'); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); PERFORM update_step(v_step_id, 'OK','Inserted '||v_total||' rows'); -- Create indexes if new table was created IF (v_table_exists = false OR v_match = 'f') AND p_index = true THEN PERFORM gdb(p_debug, 'Creating indexes'); PERFORM create_index(v_dest_table, v_snap, p_debug); END IF; EXECUTE 'ANALYZE ' ||v_refresh_snap; -- swap view v_step_id := add_step(v_job_id,'Swap view to '||v_refresh_snap); PERFORM gdb(p_debug,'Swapping view to '||v_refresh_snap); EXECUTE 'CREATE OR REPLACE VIEW '||v_dest_table||' AS SELECT * FROM '||v_refresh_snap; PERFORM update_step(v_step_id, 'OK','View Swapped'); IF v_match = false THEN -- Actually apply the original privileges if the table was recreated FOR v_old_grant IN SELECT statement FROM mimeo_snapshot_grants_tmp LOOP EXECUTE v_old_grant.statement; END LOOP; DROP TABLE IF EXISTS mimeo_snapshot_grants_tmp; EXECUTE 'ALTER VIEW '||v_dest_table||' OWNER TO '||v_old_owner; EXECUTE 'ALTER TABLE '||v_refresh_snap||' OWNER TO '||v_old_owner; -- Run any special sql to fix anything that was done to destination tables (extra indexes, etc) IF v_post_script IS NOT NULL THEN v_step_id := add_step(v_job_id,'Applying post_script sql commands due to schema change'); PERFORM @extschema@.post_script(v_dest_table); PERFORM update_step(v_step_id, 'OK','Done'); END IF; END IF; SELECT CASE WHEN count(1) > 0 THEN true ELSE false END INTO v_table_exists FROM pg_tables WHERE schemaname ||'.'|| tablename = v_old_snap_table; IF v_table_exists THEN v_step_id := add_step(v_job_id,'Truncating old snap table'); EXECUTE 'TRUNCATE TABLE '||v_old_snap_table; PERFORM update_step(v_step_id, 'OK','Done'); END IF; v_step_id := add_step(v_job_id,'Updating last_run & tuple change values'); UPDATE refresh_config_snap SET last_run = CURRENT_TIMESTAMP , n_tup_ins = v_tup_ins_new , n_tup_upd = v_tup_upd_new , n_tup_del = v_tup_del_new WHERE dest_table = p_destination; PERFORM update_step(v_step_id, 'OK','Done'); PERFORM dblink_disconnect(v_dblink_name); PERFORM close_job(v_job_id); -- Ensure old search path is reset for the current session EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); EXCEPTION WHEN QUERY_CANCELED THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; WHEN OTHERS THEN IF v_job_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_job(''Refresh Snap: '||p_destination||''')' INTO v_job_id; EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before job logging started'')' INTO v_step_id; END IF; IF v_step_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before first step logged'')' INTO v_step_id; END IF; EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; EXECUTE 'SELECT '||v_jobmon_schema||'.update_step('||v_step_id||', ''CRITICAL'', ''ERROR: '||COALESCE(SQLERRM,'unknown')||''')'; EXECUTE 'SELECT '||v_jobmon_schema||'.fail_job('||v_job_id||')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; END $$; /* * Refresh insert/update only table based on timestamp control field */ CREATE OR REPLACE FUNCTION refresh_updater(p_destination text, p_limit integer DEFAULT NULL, p_repull boolean DEFAULT false, p_repull_start text DEFAULT NULL, p_repull_end text DEFAULT NULL, p_debug boolean DEFAULT false) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE v_adv_lock boolean; v_batch_limit_reached int := 0; v_boundary_sql text; v_boundary timestamptz; v_cols_n_types text; v_cols text; v_condition text; v_control text; v_create_sql text; v_dblink int; v_dblink_name text; v_dblink_schema text; v_delete_sql text; v_dest_table text; v_dst_active boolean; v_dst_check boolean; v_dst_start int; v_dst_end int; v_fetch_sql text; v_field text; v_filter text[]; v_full_refresh boolean := false; v_insert_sql text; v_job_id int; v_jobmon_schema text; v_job_name text; v_last_fetched timestamptz; v_last_value timestamptz; v_limit int; v_link_exists boolean; v_now timestamptz := now(); v_old_search_path text; v_pk_counter int := 1; v_pk_name text[]; v_remote_boundry_sql text; v_remote_boundry timestamptz; v_remote_sql text; v_rowcount bigint := 0; v_source_table text; v_step_id int; v_tmp_table text; v_total bigint := 0; BEGIN IF p_debug IS DISTINCT FROM true THEN PERFORM set_config( 'client_min_messages', 'warning', true ); END IF; v_job_name := 'Refresh Updater: '||p_destination; v_dblink_name := 'mimeo_updater_refresh_'||p_destination; SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; -- Set custom search path to allow easier calls to other functions, especially job logging SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||',public'',''false'')'; -- Take advisory lock to prevent multiple calls to function overlapping v_adv_lock := pg_try_advisory_lock(hashtext('refresh_updater'), hashtext(v_job_name)); IF v_adv_lock = 'false' THEN v_job_id := add_job(v_job_name); v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name); PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name); PERFORM update_step(v_step_id, 'WARNING','Found concurrent job. Exiting gracefully'); PERFORM fail_job(v_job_id, 2); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RETURN; END IF; v_job_id := add_job(v_job_name); PERFORM gdb(p_debug,'Job ID: '||v_job_id::text); SELECT source_table , dest_table , 'tmp_'||replace(dest_table,'.','_') , dblink , control , last_value , now() - boundary::interval , pk_name , filter , condition , dst_active , dst_start , dst_end , batch_limit FROM refresh_config_updater WHERE dest_table = p_destination INTO v_source_table , v_dest_table , v_tmp_table , v_dblink , v_control , v_last_value , v_boundary , v_pk_name , v_filter , v_condition , v_dst_active , v_dst_start , v_dst_end , v_limit; IF NOT FOUND THEN RAISE EXCEPTION 'No configuration found for %',v_job_name; END IF; -- Do not allow this function to run during DST time change if config option is true. Otherwise will miss data from source IF v_dst_active THEN v_dst_check := @extschema@.dst_change(v_now); IF v_dst_check THEN IF to_number(to_char(v_now, 'HH24MM'), '0000') > v_dst_start AND to_number(to_char(v_now, 'HH24MM'), '0000') < v_dst_end THEN v_step_id := add_step( v_job_id, 'DST Check'); PERFORM update_step(v_step_id, 'OK', 'Job CANCELLED - Does not run during DST time change'); UPDATE refresh_config_updater SET last_run = CURRENT_TIMESTAMP WHERE dest_table = p_destination; PERFORM close_job(v_job_id); PERFORM gdb(p_debug, 'Cannot run during DST time change'); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_updater'), hashtext(v_job_name)); RETURN; END IF; END IF; END IF; v_step_id := add_step(v_job_id,'Building SQL'); -- determine column list, column type list IF v_filter IS NULL THEN SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',') INTO v_cols, v_cols_n_types FROM pg_attribute WHERE attrelid = p_destination::regclass AND attnum > 0 AND attisdropped is false; ELSE -- ensure all primary key columns are included in any column filters FOREACH v_field IN ARRAY v_pk_name LOOP IF v_field = ANY(v_filter) THEN CONTINUE; ELSE RAISE EXCEPTION 'Filter list did not contain all columns that compose primary/unique key for %',v_job_name; END IF; END LOOP; SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',') INTO v_cols, v_cols_n_types FROM pg_attribute WHERE attrelid = p_destination::regclass AND ARRAY[attname::text] <@ v_filter AND attnum > 0 AND attisdropped is false; END IF; PERFORM dblink_connect(v_dblink_name, auth(v_dblink)); IF p_limit IS NOT NULL THEN v_limit := p_limit; END IF; -- Repull old data instead of normal new data pull IF p_repull THEN -- Repull ALL data if no start and end values set IF p_repull_start IS NULL AND p_repull_end IS NULL THEN PERFORM update_step(v_step_id, 'OK','Request to repull ALL data from source. This could take a while...'); EXECUTE 'TRUNCATE '||v_dest_table; v_remote_sql := 'SELECT '||v_cols||' FROM '||v_source_table; IF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition; END IF; ELSE PERFORM update_step(v_step_id, 'OK','Request to repull data from '||p_repull_start||' to '||p_repull_end); PERFORM gdb(p_debug,'Request to repull data from '||p_repull_start||' to '||p_repull_end); v_remote_sql := 'SELECT '||v_cols||' FROM '||v_source_table; IF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition || ' AND '; ELSE v_remote_sql := v_remote_sql || ' WHERE '; END IF; v_remote_sql := v_remote_sql ||v_control||' > '||quote_literal(COALESCE(p_repull_start, '-infinity'))||' AND ' ||v_control||' < '||quote_literal(COALESCE(p_repull_end, 'infinity')); EXECUTE 'DELETE FROM '||v_dest_table||' WHERE '||v_control||' > '||quote_literal(COALESCE(p_repull_start, '-infinity'))||' AND ' ||v_control||' < '||quote_literal(COALESCE(p_repull_end, 'infinity')); END IF; ELSE -- does < for upper boundary to keep missing data from happening on rare edge case where a newly inserted row outside the transaction batch -- has the exact same timestamp as the previous batch's max timestamp v_remote_sql := 'SELECT '||v_cols||' FROM '||v_source_table; IF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition || ' AND '; ELSE v_remote_sql := v_remote_sql || ' WHERE '; END IF; v_remote_sql := v_remote_sql ||v_control||' > '||quote_literal(v_last_value)||' AND '||v_control||' < '||quote_literal(v_boundary)||' ORDER BY '||v_control||' ASC LIMIT '|| COALESCE(v_limit::text, 'ALL'); v_delete_sql := 'DELETE FROM '||v_dest_table||' USING '||v_tmp_table||' t WHERE '; WHILE v_pk_counter <= array_length(v_pk_name,1) LOOP IF v_pk_counter > 1 THEN v_delete_sql := v_delete_sql ||' AND '; END IF; v_delete_sql := v_delete_sql ||v_dest_table||'.'||v_pk_name[v_pk_counter]||' = t.'||v_pk_name[v_pk_counter]; v_pk_counter := v_pk_counter + 1; END LOOP; PERFORM update_step(v_step_id, 'OK','Grabbing rows from '||v_last_value::text||' to '||v_boundary::text); PERFORM gdb(p_debug,'Grabbing rows from '||v_last_value::text||' to '||v_boundary::text); END IF; v_insert_sql := 'INSERT INTO '||v_dest_table||' ('||v_cols||') SELECT '||v_cols||' FROM '||v_tmp_table; PERFORM gdb(p_debug,v_remote_sql); PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_sql); v_step_id := add_step(v_job_id, 'Inserting new/updated records into local table'); v_rowcount := 0; EXECUTE 'CREATE TEMP TABLE '||v_tmp_table||' ('||v_cols_n_types||')'; LOOP v_fetch_sql := 'INSERT INTO '||v_tmp_table||'('||v_cols||') SELECT '||v_cols||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||v_cols_n_types||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; v_total := v_total + coalesce(v_rowcount, 0); EXECUTE 'SELECT max('||v_control||') FROM '||v_tmp_table INTO v_last_fetched; IF v_limit IS NULL THEN -- insert into the real table in batches if no limit to avoid excessively large temp tables IF p_repull IS FALSE THEN -- Delete any rows that exist in the current temp table batch. repull delete is done above. EXECUTE v_delete_sql; END IF; EXECUTE v_insert_sql; EXECUTE 'TRUNCATE '||v_tmp_table; END IF; EXIT WHEN v_rowcount = 0; PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far. Last fetched: '||v_last_fetched); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far. Last fetched: '||v_last_fetched); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); PERFORM update_step(v_step_id, 'OK','Rows fetched: '||v_total); IF v_limit IS NULL THEN -- nothing else to do ELSE -- When using batch limits, entire batch must be pulled to temp table before inserting to real table to catch edge cases v_step_id := add_step(v_job_id,'Checking for batch limit issues'); -- Not recommended that the batch actually equal the limit set if possible. IF v_total >= v_limit THEN PERFORM update_step(v_step_id, 'WARNING','Row count fetched equal to or greater than limit set: '||v_limit||'. Recommend increasing batch limit if possible.'); PERFORM gdb(p_debug, 'Row count fetched equal to or greater than limit set: '||v_limit||'. Recommend increasing batch limit if possible.'); EXECUTE 'SELECT max('||v_control||') FROM '||v_tmp_table INTO v_last_value; v_step_id := add_step(v_job_id, 'Removing high boundary rows from this batch to avoid missing data'); EXECUTE 'DELETE FROM '||v_tmp_table||' WHERE '||v_control||' = '||quote_literal(v_last_value); GET DIAGNOSTICS v_rowcount = ROW_COUNT; PERFORM update_step(v_step_id, 'OK', 'Removed '||v_rowcount||' rows. Batch now contains '||v_limit - v_rowcount||' records'); PERFORM gdb(p_debug, 'Removed '||v_rowcount||' rows from batch. Batch table now contains '||v_limit - v_rowcount||' records'); v_batch_limit_reached := 2; IF (v_limit - v_rowcount) < 1 THEN v_step_id := add_step(v_job_id, 'Reached inconsistent state'); PERFORM update_step(v_step_id, 'CRITICAL', 'Batch contained max rows ('||v_limit||') or greater and all contained the same timestamp value. Unable to guarentee rows will ever be replicated consistently. Increase row limit parameter to allow a consistent batch.'); PERFORM gdb(p_debug, 'Batch contained max rows ('||v_limit||') or greater and all contained the same timestamp value. Unable to guarentee rows will be replicated consistently. Increase row limit parameter to allow a consistent batch.'); v_batch_limit_reached := 3; END IF; ELSE PERFORM update_step(v_step_id, 'OK','No issues found'); PERFORM gdb(p_debug, 'No issues found'); END IF; IF v_batch_limit_reached <> 3 THEN EXECUTE 'CREATE INDEX ON '||v_tmp_table||' ('||array_to_string(v_pk_name, ',')||')'; -- incase of large batch limit EXECUTE 'ANALYZE '||v_tmp_table; v_step_id := add_step(v_job_id,'Deleting records marked for update in local table'); perform gdb(p_debug,v_delete_sql); execute v_delete_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; PERFORM update_step(v_step_id, 'OK','Deleted '||v_rowcount||' records'); v_step_id := add_step(v_job_id,'Inserting new records into local table'); perform gdb(p_debug,v_insert_sql); EXECUTE v_insert_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; PERFORM update_step(v_step_id, 'OK','Inserted '||v_rowcount||' records'); END IF; END IF; -- end v_limit IF IF v_batch_limit_reached <> 3 THEN v_step_id := add_step(v_job_id, 'Setting next lower boundary'); EXECUTE 'SELECT max('||v_control||') FROM '|| v_dest_table INTO v_last_value; UPDATE refresh_config_updater set last_value = coalesce(v_last_value, CURRENT_TIMESTAMP), last_run = CURRENT_TIMESTAMP WHERE dest_table = p_destination; PERFORM update_step(v_step_id, 'OK','Lower boundary value is: '||coalesce(v_last_value, CURRENT_TIMESTAMP)); PERFORM gdb(p_debug, 'Lower boundary value is: '||coalesce(v_last_value, CURRENT_TIMESTAMP)); END IF; EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table; PERFORM dblink_disconnect(v_dblink_name); IF v_batch_limit_reached = 0 THEN PERFORM close_job(v_job_id); ELSIF v_batch_limit_reached = 2 THEN -- Set final job status to level 2 (WARNING) to bring notice that the batch limit was reached and may need adjusting. -- Preventive warning to keep replication from falling behind. PERFORM fail_job(v_job_id, 2); ELSIF v_batch_limit_reached = 3 THEN -- Really bad. Critical alert! PERFORM fail_job(v_job_id); END IF; -- Ensure old search path is reset for the current session EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_updater'), hashtext(v_job_name)); EXCEPTION WHEN QUERY_CANCELED THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; PERFORM pg_advisory_unlock(hashtext('refresh_updater'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; WHEN OTHERS THEN EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')'; IF v_job_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_job(''Refresh Updater: '||p_destination||''')' INTO v_job_id; EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before job logging started'')' INTO v_step_id; END IF; IF v_step_id IS NULL THEN EXECUTE 'SELECT '||v_jobmon_schema||'.add_step('||v_job_id||', ''EXCEPTION before first step logged'')' INTO v_step_id; END IF; EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; EXECUTE 'SELECT '||v_jobmon_schema||'.update_step('||v_step_id||', ''CRITICAL'', ''ERROR: '||COALESCE(SQLERRM,'unknown')||''')'; EXECUTE 'SELECT '||v_jobmon_schema||'.fail_job('||v_job_id||')'; PERFORM pg_advisory_unlock(hashtext('refresh_updater'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; END $$; /* * Plain table refresh function. */ CREATE OR REPLACE FUNCTION refresh_table(p_destination text, p_truncate_cascade boolean DEFAULT NULL, p_debug boolean DEFAULT false) RETURNS void LANGUAGE plpgsql AS $$ DECLARE v_adv_lock boolean; v_cols text[]; v_cols_n_types text[]; v_condition text; v_dblink int; v_dblink_name text; v_dblink_schema text; v_dest_table text; v_fetch_sql text; v_filter text; v_link_exists boolean; v_old_search_path text; v_post_script text[]; v_remote_sql text; v_rowcount bigint := 0; v_seq text; v_seq_max bigint; v_sequences text[]; v_source_table text; v_total bigint := 0; v_truncate_cascade boolean; v_truncate_sql text; BEGIN SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; v_dblink_name := 'mimeo_table_refresh_'||p_destination; SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_dblink_schema||',public'',''false'')'; v_adv_lock := pg_try_advisory_lock(hashtext('refresh_table'), hashtext(p_destination)); IF v_adv_lock = 'false' THEN RAISE NOTICE 'Found concurrent job. Exiting gracefully'; EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RETURN; END IF; SELECT source_table , dest_table , dblink , filter , condition , sequences , truncate_cascade INTO v_source_table , v_dest_table , v_dblink , v_filter , v_condition , v_sequences , v_truncate_cascade FROM refresh_config_table WHERE dest_table = p_destination; IF NOT FOUND THEN RAISE EXCEPTION 'No configuration found for Refresh Table: %',p_destination; END IF; IF p_truncate_cascade IS NOT NULL THEN v_truncate_cascade := p_truncate_cascade; END IF; v_truncate_sql := 'TRUNCATE TABLE '||v_dest_table; IF v_truncate_cascade THEN v_truncate_sql := v_truncate_sql || ' CASCADE'; RAISE NOTICE 'WARNING! If this table had foreign keys, you have just truncated all referencing tables as well!'; END IF; EXECUTE v_truncate_sql; PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink)); v_remote_sql := 'SELECT array_agg(attname) as cols, array_agg(attname||'' ''||format_type(atttypid, atttypmod)::text) as cols_n_types FROM pg_attribute WHERE attrelid = '||quote_literal(v_source_table)||'::regclass AND attnum > 0 AND attisdropped is false'; IF v_filter IS NOT NULL THEN v_remote_sql := v_remote_sql || ' AND ARRAY[attname::text] <@ '||quote_literal(v_filter); END IF; v_remote_sql := 'SELECT cols, cols_n_types FROM dblink('||quote_literal(v_dblink_name)||', ' || quote_literal(v_remote_sql) || ') t (cols text[], cols_n_types text[])'; EXECUTE v_remote_sql INTO v_cols, v_cols_n_types; v_remote_sql := 'SELECT '|| array_to_string(v_cols, ',') ||' FROM '||v_source_table; IF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition; END IF; PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_sql); v_rowcount := 0; LOOP v_fetch_sql := 'INSERT INTO '|| v_dest_table ||' ('|| array_to_string(v_cols, ',') ||') SELECT '||array_to_string(v_cols, ',')||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||array_to_string(v_cols_n_types, ',')||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; EXIT WHEN v_rowcount = 0; v_total := v_total + coalesce(v_rowcount, 0); PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); PERFORM dblink_disconnect(v_dblink_name); -- Reset any sequences given in the parameter to their new value. -- Checks all tables that use the given sequence to ensure it's the max for the entire database. IF v_sequences IS NOT NULL THEN FOREACH v_seq IN ARRAY v_sequences LOOP SELECT sequence_max_value(c.oid) INTO v_seq_max FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname ||'.'|| c.relname = v_seq; IF v_seq_max IS NOT NULL THEN PERFORM setval(v_seq, v_seq_max); END IF; END LOOP; END IF; UPDATE refresh_config_table set last_run = CURRENT_TIMESTAMP WHERE dest_table = v_dest_table; PERFORM pg_advisory_unlock(hashtext('refresh_table'), hashtext(p_destination)); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; EXCEPTION WHEN QUERY_CANCELED OR OTHERS THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists; IF v_link_exists THEN EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')'; END IF; PERFORM pg_advisory_unlock(hashtext('refresh_table'), hashtext(p_destination)); RAISE EXCEPTION '%', SQLERRM; END $$;