-- For snapshot replication, if the source's columns change and the destination tables & view are recreated, the original permissions are now automatically restored. Prior to this, the post_script array field had to be used to replay the grants. This is no longer required and you can remove any grant commands from the post_script columns. -- Snapshot refreshes can now detect whether the source table has had any DML changes (inserts, updates & deletes) and if there have been no changes, skip the data pull step completely. Does not work if source is a view. This can be a massive savings on system load for large tables that don't change often, but you still want to have a way to ensure you get any changes to the destination as soon as possible. The "track_counts" PostgreSQL setting must be turned on for this to work (which is the default). -- Truncate the old snapshot table as well after the view swap to avoid storing data twice. Thanks to Raghavendra for the suggestion. -- Snapshot & incremental maker functions now ensure that the given table names are schema qualified. -- Updated the extras functions for older source versions of postgresql. There's new dml functions for 8.1 as well as custom array_agg() functions that mimeo requires. ALTER TABLE @extschema@.refresh_config_snap ADD n_tup_ins bigint; ALTER TABLE @extschema@.refresh_config_snap ADD n_tup_upd bigint; ALTER TABLE @extschema@.refresh_config_snap ADD n_tup_del bigint; /* * Snap refresh to repull all table data */ CREATE OR REPLACE FUNCTION refresh_snap(p_destination text, p_index boolean DEFAULT true, p_debug boolean DEFAULT false, p_pulldata boolean DEFAULT true) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE v_adv_lock boolean; v_cols_n_types text[]; v_cols text[]; v_condition text; v_create_sql text; v_dblink int; v_dblink_name text; v_dblink_schema text; v_dest_table text; v_exists int; v_fetch_sql text; v_filter text[]; v_insert_sql text; v_job_id int; v_jobmon_schema text; v_job_name text; v_lcols_array text[]; v_local_sql text; v_l text; v_match boolean = true; v_old_grant record; v_old_owner text; v_old_search_path text; v_old_snap text; v_old_snap_table text; v_parts record; v_post_script text[]; v_refresh_snap text; v_remote_index_sql text; v_remote_sql text; v_row record; v_rowcount bigint; v_r text; v_snap text; v_source_table text; v_step_id int; v_table_exists int; v_total bigint := 0; v_tup_del bigint; v_tup_ins bigint; v_tup_upd bigint; v_tup_del_new bigint; v_tup_ins_new bigint; v_tup_upd_new bigint; v_view_definition text; BEGIN IF p_debug IS DISTINCT FROM true THEN PERFORM set_config( 'client_min_messages', 'notice', true ); END IF; v_job_name := 'Refresh Snap: '||p_destination; v_dblink_name := 'mimeo_snap_refresh_'||p_destination; SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; -- Set custom search path to allow easier calls to other functions, especially job logging SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||',public'',''false'')'; v_job_id := add_job(v_job_name); PERFORM gdb(p_debug,'Job ID: '||v_job_id::text); -- Take advisory lock to prevent multiple calls to function overlapping and causing possible deadlock v_adv_lock := pg_try_advisory_lock(hashtext('refresh_snap'), hashtext(v_job_name)); IF v_adv_lock = 'false' THEN v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name); PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name); PERFORM update_step(v_step_id, 'WARNING','Found concurrent job. Exiting gracefully'); PERFORM fail_job(v_job_id, 2); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RETURN; END IF; v_step_id := add_step(v_job_id,'Grabbing Mapping, Building SQL'); SELECT source_table , dest_table , dblink , filter , condition , n_tup_ins , n_tup_upd , n_tup_del , post_script INTO v_source_table , v_dest_table , v_dblink , v_filter , v_condition , v_tup_ins , v_tup_upd , v_tup_del , v_post_script FROM refresh_config_snap WHERE dest_table = p_destination; IF NOT FOUND THEN RAISE EXCEPTION 'ERROR: This table is not set up for snapshot replication: %',v_job_name; END IF; -- checking for current view SELECT definition INTO v_view_definition FROM pg_views where ((schemaname || '.') || viewname)=v_dest_table; PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink)); v_remote_sql := 'SELECT array_agg(attname) as cols, array_agg(attname||'' ''||format_type(atttypid, atttypmod)::text) as cols_n_types FROM pg_attribute WHERE attrelid = '||quote_literal(v_source_table)||'::regclass AND attnum > 0 AND attisdropped is false'; -- Apply column filters if used IF v_filter IS NOT NULL THEN v_remote_sql := v_remote_sql || ' AND ARRAY[attname::text] <@ '||quote_literal(v_filter); END IF; v_remote_sql := 'SELECT cols, cols_n_types FROM dblink('||quote_literal(v_dblink_name)||', ' || quote_literal(v_remote_sql) || ') t (cols text[], cols_n_types text[])'; PERFORM gdb(p_debug,'v_remote_sql: '||v_remote_sql); EXECUTE v_remote_sql INTO v_cols, v_cols_n_types; PERFORM gdb(p_debug,'v_cols: {'|| array_to_string(v_cols, ',') ||'}'); PERFORM gdb(p_debug,'v_cols_n_types: {'|| array_to_string(v_cols_n_types, ',') ||'}'); PERFORM update_step(v_step_id, 'OK','Done'); v_step_id := add_step(v_job_id,'Truncate non-active snap table'); v_exists := strpos(v_view_definition, 'snap1'); IF v_exists > 0 THEN v_snap := '_snap2'; v_old_snap := '_snap1'; ELSE v_snap := '_snap1'; v_old_snap := '_snap2'; END IF; v_refresh_snap := v_dest_table||v_snap; v_old_snap_table := v_dest_table||v_old_snap; PERFORM gdb(p_debug,'v_refresh_snap: '||v_refresh_snap::text); SELECT string_to_array(v_refresh_snap, '.') AS oparts INTO v_parts; -- Create snap table if it doesn't exist SELECT INTO v_table_exists count(1) FROM pg_tables WHERE schemaname ||'.'|| tablename = v_refresh_snap; IF v_table_exists = 0 THEN PERFORM gdb(p_debug,'Snap table does not exist. Creating... '); v_create_sql := 'CREATE TABLE ' || v_refresh_snap || ' (' || array_to_string(v_cols_n_types, ',') || ')'; perform gdb(p_debug,'v_create_sql: '||v_create_sql::text); EXECUTE v_create_sql; ELSE /* Check local column definitions against remote and recreate table if different. Allows automatic recreation of snap tables if columns change (add, drop type change) */ v_local_sql := 'SELECT array_agg(attname||'' ''||format_type(atttypid, atttypmod)::text) as cols_n_types FROM pg_attribute WHERE attnum > 0 AND attisdropped is false AND attrelid = ' || quote_literal(v_refresh_snap) || '::regclass'; PERFORM gdb(p_debug, v_local_sql); EXECUTE v_local_sql INTO v_lcols_array; -- Check to see if there's a change in the column structure on the remote FOREACH v_r IN ARRAY v_cols_n_types LOOP v_match := false; FOREACH v_l IN ARRAY v_lcols_array LOOP IF v_r = v_l THEN v_match := true; EXIT; END IF; END LOOP; END LOOP; IF v_match = false THEN -- Grab old table & view privileges. They are applied later after the view is recreated/swapped CREATE TEMP TABLE mimeo_snapshot_grants_tmp (statement text); FOR v_old_grant IN SELECT table_schema ||'.'|| table_name AS tablename , array_agg(privilege_type::text) AS types , grantee FROM information_schema.table_privileges WHERE table_schema ||'.'|| table_name IN (v_refresh_snap, v_dest_table) GROUP BY grantee, table_schema, table_name LOOP INSERT INTO mimeo_snapshot_grants_tmp VALUES ( 'GRANT '||array_to_string(v_old_grant.types, ',')||' ON '||v_old_grant.tablename||' TO '||v_old_grant.grantee ); END LOOP; SELECT viewowner INTO v_old_owner FROM pg_views WHERE schemaname ||'.'|| viewname = v_dest_table; EXECUTE 'DROP TABLE ' || v_refresh_snap; EXECUTE 'DROP VIEW ' || v_dest_table; v_create_sql := 'CREATE TABLE ' || v_refresh_snap || ' (' || array_to_string(v_cols_n_types, ',') || ')'; PERFORM gdb(p_debug,'v_create_sql: '||v_create_sql::text); EXECUTE v_create_sql; v_step_id := add_step(v_job_id,'Source table structure changed.'); PERFORM update_step(v_step_id, 'OK','Tables and view dropped and recreated. Please double-check snap table attributes (permissions, indexes, etc'); PERFORM gdb(p_debug,'Source table structure changed. Tables and view dropped and recreated. Please double-check snap table attributes (permissions, indexes, etc)'); END IF; -- truncate non-active snap table EXECUTE 'TRUNCATE TABLE ' || v_refresh_snap; PERFORM update_step(v_step_id, 'OK','Done'); END IF; -- Only check the remote data if there have been no column changes and snap table actually exists. -- Otherwise maker functions won't work if source is empty & view switch won't happen properly. IF v_table_exists > 0 AND v_match THEN v_remote_sql := 'SELECT n_tup_ins, n_tup_upd, n_tup_del FROM pg_catalog.pg_stat_all_tables WHERE relid::regclass = '||quote_literal(v_source_table)||'::regclass'; v_remote_sql := 'SELECT n_tup_ins, n_tup_upd, n_tup_del FROM dblink('||quote_literal(v_dblink_name)||', ' || quote_literal(v_remote_sql) || ') t (n_tup_ins bigint, n_tup_upd bigint, n_tup_del bigint)'; perform gdb(p_debug,'v_remote_sql: '||v_remote_sql); EXECUTE v_remote_sql INTO v_tup_ins_new, v_tup_upd_new, v_tup_del_new; IF v_tup_ins_new = v_tup_ins AND v_tup_upd_new = v_tup_upd AND v_tup_del_new = v_tup_del THEN PERFORM gdb(p_debug,'Remote table has not had any writes. Skipping data pull'); PERFORM update_step(v_step_id, 'OK', 'Remote table has not had any writes. Skipping data pull'); PERFORM dblink_disconnect(v_dblink_name); PERFORM close_job(v_job_id); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); RETURN; END IF; END IF; v_remote_sql := 'SELECT '|| array_to_string(v_cols, ',') ||' FROM '||v_source_table; -- Used by p_pull options in all maker functions to be able to create a replication job but pull no data IF p_pulldata = false THEN v_remote_sql := v_remote_sql || ' LIMIT 0'; ELSIF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition; END IF; v_step_id := add_step(v_job_id,'Inserting records into local table'); PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_sql); v_rowcount := 0; LOOP v_fetch_sql := 'INSERT INTO '|| v_refresh_snap ||' ('|| array_to_string(v_cols, ',') ||') SELECT '||array_to_string(v_cols, ',')||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||array_to_string(v_cols_n_types, ',')||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; EXIT WHEN v_rowcount = 0; v_total := v_total + coalesce(v_rowcount, 0); PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.'); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); PERFORM update_step(v_step_id, 'OK','Inserted '||v_total||' rows'); -- Create indexes if new table was created IF (v_table_exists = 0 OR v_match = 'f') AND p_index = true THEN v_remote_index_sql := 'SELECT CASE WHEN i.indisprimary IS true THEN ''primary'' WHEN i.indisunique IS true THEN ''unique'' ELSE ''index'' END AS key_type, ( SELECT array_agg( a.attname ORDER by x.r ) FROM pg_attribute a JOIN ( SELECT k, row_number() over () as r FROM unnest(i.indkey) k ) as x ON a.attnum = x.k AND a.attrelid = i.indrelid '; IF v_filter IS NOT NULL THEN v_remote_index_sql := v_remote_index_sql || ' WHERE ARRAY[a.attname::text] <@ '||quote_literal(v_filter); END IF; v_remote_index_sql := v_remote_index_sql || ') AS indkey_names FROM pg_index i WHERE i.indrelid = '||quote_literal(v_source_table)||'::regclass'; FOR v_row IN EXECUTE 'SELECT key_type, indkey_names FROM dblink('||quote_literal(v_dblink_name)||', '||quote_literal(v_remote_index_sql)||') t (key_type text, indkey_names text[])' LOOP IF v_row.indkey_names IS NOT NULL THEN -- If column filter is used, indkey_name column may be null IF v_row.key_type = 'primary' THEN RAISE NOTICE 'Creating primary key...'; EXECUTE 'ALTER TABLE '||v_refresh_snap||' ADD CONSTRAINT '||v_parts.oparts[2]||'_'||array_to_string(v_row.indkey_names, '_')||'_idx PRIMARY KEY ('||array_to_string(v_row.indkey_names, ',')||')'; ELSIF v_row.key_type = 'unique' THEN RAISE NOTICE 'Creating unique index...'; EXECUTE 'CREATE UNIQUE INDEX '||v_parts.oparts[2]||'_'||array_to_string(v_row.indkey_names, '_')||'_idx ON '||v_refresh_snap|| '('||array_to_string(v_row.indkey_names, ',')||')'; ELSE RAISE NOTICE 'Creating index...'; EXECUTE 'CREATE INDEX '||v_parts.oparts[2]||'_'||array_to_string(v_row.indkey_names, '_')||'_idx ON '||v_refresh_snap|| '('||array_to_string(v_row.indkey_names, ',')||')'; END IF; END IF; END LOOP; END IF; EXECUTE 'ANALYZE ' ||v_refresh_snap; --SET statement_timeout='30 min'; -- swap view v_step_id := add_step(v_job_id,'Swap view to '||v_refresh_snap); PERFORM gdb(p_debug,'Swapping view to '||v_refresh_snap); EXECUTE 'CREATE OR REPLACE VIEW '||v_dest_table||' AS SELECT * FROM '||v_refresh_snap; PERFORM update_step(v_step_id, 'OK','View Swapped'); IF v_match = false THEN -- Actually apply the original privileges if the table was recreated FOR v_old_grant IN SELECT statement FROM mimeo_snapshot_grants_tmp LOOP EXECUTE v_old_grant.statement; END LOOP; DROP TABLE IF EXISTS mimeo_snapshot_grants_tmp; EXECUTE 'ALTER VIEW '||v_dest_table||' OWNER TO '||v_old_owner; EXECUTE 'ALTER TABLE '||v_refresh_snap||' OWNER TO '||v_old_owner; -- Run any special sql to fix anything that was done to destination tables (extra indexes, etc) IF v_post_script IS NOT NULL THEN v_step_id := add_step(v_job_id,'Applying post_script sql commands due to schema change'); PERFORM @extschema@.post_script(v_dest_table); PERFORM update_step(v_step_id, 'OK','Done'); END IF; END IF; SELECT INTO v_table_exists count(1) FROM pg_tables WHERE schemaname ||'.'|| tablename = v_old_snap_table; IF v_table_exists > 0 THEN v_step_id := add_step(v_job_id,'Truncating old snap table'); EXECUTE 'TRUNCATE TABLE '||v_old_snap_table; PERFORM update_step(v_step_id, 'OK','Done'); END IF; v_step_id := add_step(v_job_id,'Updating last value'); UPDATE refresh_config_snap SET last_run = CURRENT_TIMESTAMP , n_tup_ins = v_tup_ins_new , n_tup_upd = v_tup_upd_new , n_tup_del = v_tup_del_new WHERE dest_table = p_destination; PERFORM update_step(v_step_id, 'OK','Done'); PERFORM dblink_disconnect(v_dblink_name); PERFORM close_job(v_job_id); -- Ensure old search path is reset for the current session EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); EXCEPTION WHEN QUERY_CANCELED THEN EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')'; IF dblink_get_connections() @> ARRAY[v_dblink_name] THEN PERFORM dblink_disconnect(v_dblink_name); END IF; EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; WHEN OTHERS THEN EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')'; IF v_job_id IS NULL THEN v_job_id := add_job('Refresh Snap: '||p_destination); v_step_id := add_step(v_job_id, 'EXCEPTION before job logging started'); END IF; IF v_step_id IS NULL THEN v_step_id := add_step(v_job_id, 'EXCEPTION before first step logged'); END IF; PERFORM update_step(v_step_id, 'CRITICAL', 'ERROR: '||coalesce(SQLERRM,'unknown')); PERFORM fail_job(v_job_id); IF dblink_get_connections() @> ARRAY[v_dblink_name] THEN PERFORM dblink_disconnect(v_dblink_name); END IF; EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; END $$; /* * Snapshot maker function. Optional custom destination table name. */ CREATE OR REPLACE FUNCTION snapshot_maker( p_src_table text , p_dblink_id int , p_dest_table text DEFAULT NULL , p_index boolean DEFAULT true , p_filter text[] DEFAULT NULL , p_condition text DEFAULT NULL , p_pulldata boolean DEFAULT true) RETURNS void LANGUAGE plpgsql AS $$ DECLARE v_data_source text; v_insert_refresh_config text; BEGIN SELECT data_source INTO v_data_source FROM @extschema@.dblink_mapping WHERE data_source_id = p_dblink_id; IF NOT FOUND THEN RAISE EXCEPTION 'ERROR: Database link ID does not exist in @extschema@.dblink_mapping: %', p_dblink_id; END IF; IF p_dest_table IS NULL THEN p_dest_table := p_src_table; END IF; IF position('.' in p_dest_table) = 0 AND position('.' in p_src_table) = 0 THEN RAISE EXCEPTION 'Source (and destination) table must be schema qualified'; END IF; v_insert_refresh_config := 'INSERT INTO @extschema@.refresh_config_snap(source_table, dest_table, dblink, filter, condition) VALUES('||quote_literal(p_src_table)||', '||quote_literal(p_dest_table)||','||p_dblink_id||','||COALESCE(quote_literal(p_filter), 'NULL')||','||COALESCE(quote_literal(p_condition), 'NULL')||')'; RAISE NOTICE 'Inserting record in @extschema@.refresh_config'; EXECUTE v_insert_refresh_config; RAISE NOTICE 'Insert successful'; RAISE NOTICE 'attempting first snapshot'; EXECUTE 'SELECT @extschema@.refresh_snap('||quote_literal(p_dest_table)||', p_index := '||p_index||', p_pulldata := '||p_pulldata||')'; RAISE NOTICE 'attempting second snapshot'; EXECUTE 'SELECT @extschema@.refresh_snap('||quote_literal(p_dest_table)||', p_index := '||p_index||', p_pulldata := '||p_pulldata||')'; RAISE NOTICE 'all done'; RETURN; END $$; /* * Inserter maker function. */ CREATE OR REPLACE FUNCTION inserter_maker( p_src_table text , p_control_field text , p_dblink_id int , p_boundary interval DEFAULT '00:10:00' , p_dest_table text DEFAULT NULL , p_index boolean DEFAULT true , p_filter text[] DEFAULT NULL , p_condition text DEFAULT NULL , p_pulldata boolean DEFAULT true) RETURNS void LANGUAGE plpgsql AS $$ DECLARE v_data_source text; v_dest_check text; v_dest_schema_name text; v_dest_table_name text; v_dst_active boolean; v_insert_refresh_config text; v_max_timestamp timestamptz; BEGIN SELECT data_source INTO v_data_source FROM @extschema@.dblink_mapping WHERE data_source_id = p_dblink_id; IF NOT FOUND THEN RAISE EXCEPTION 'ERROR: database link ID is incorrect %', p_dblink_id; END IF; IF p_dest_table IS NULL THEN p_dest_table := p_src_table; END IF; IF position('.' in p_dest_table) > 0 AND position('.' in p_src_table) > 0 THEN v_dest_schema_name := split_part(p_dest_table, '.', 1); v_dest_table_name := split_part(p_dest_table, '.', 2); ELSE RAISE EXCEPTION 'Source (and destination) table must be schema qualified'; END IF; -- Only create destination table if it doesn't already exist SELECT schemaname||'.'||tablename INTO v_dest_check FROM pg_tables WHERE schemaname = v_dest_schema_name AND tablename = v_dest_table_name; IF v_dest_check IS NULL THEN v_insert_refresh_config := 'INSERT INTO @extschema@.refresh_config_snap(source_table, dest_table, dblink, filter, condition) VALUES(' ||quote_literal(p_src_table)||', '||quote_literal(p_dest_table)||', '||p_dblink_id||',' ||COALESCE(quote_literal(p_filter), 'NULL')||','||COALESCE(quote_literal(p_condition), 'NULL')||')'; RAISE NOTICE 'Snapshotting source table to pull all current source data...'; EXECUTE v_insert_refresh_config; EXECUTE 'SELECT @extschema@.refresh_snap('||quote_literal(p_dest_table)||', p_index := '||p_index||', p_pulldata := '||p_pulldata||')'; PERFORM @extschema@.snapshot_destroyer(p_dest_table, 'ARCHIVE'); RAISE NOTICE 'Snapshot complete.'; ELSE RAISE NOTICE 'Destination table % already exists. No data or indexes were pulled from source', p_dest_table; END IF; RAISE NOTICE 'Getting the maximum destination timestamp...'; EXECUTE 'SELECT max('||p_control_field||') FROM '||p_dest_table||';' INTO v_max_timestamp; v_dst_active := @extschema@.dst_utc_check(); v_insert_refresh_config := 'INSERT INTO @extschema@.refresh_config_inserter(source_table, dest_table, dblink, control, boundary, last_value, last_run, dst_active, filter, condition) VALUES(' ||quote_literal(p_src_table)||','||quote_literal(p_dest_table)||','|| p_dblink_id||',' ||quote_literal(p_control_field)||','||quote_literal(p_boundary)||','||quote_literal(COALESCE(v_max_timestamp, CURRENT_TIMESTAMP))||',' ||quote_literal(CURRENT_TIMESTAMP)||','||v_dst_active||','||COALESCE(quote_literal(p_filter), 'NULL')||','||COALESCE(quote_literal(p_condition), 'NULL')||');'; RAISE NOTICE 'Inserting data into config table'; EXECUTE v_insert_refresh_config; -- Remove temp snap from config EXECUTE 'DELETE FROM @extschema@.refresh_config_snap WHERE source_table = '||quote_literal(p_src_table)||' AND dest_table = '||quote_literal(p_dest_table); RAISE NOTICE 'Done'; END $$; /* * Updater maker function. */ CREATE OR REPLACE FUNCTION updater_maker( p_src_table text , p_control_field text , p_dblink_id int , p_boundary interval DEFAULT '00:10:00' , p_dest_table text DEFAULT NULL , p_index boolean DEFAULT true , p_filter text[] DEFAULT NULL , p_condition text DEFAULT NULL , p_pulldata boolean DEFAULT true , p_pk_name text[] DEFAULT NULL , p_pk_type text[] DEFAULT NULL) RETURNS void LANGUAGE plpgsql AS $$ DECLARE v_data_source text; v_dblink_schema text; v_dest_check text; v_dest_schema_name text; v_dest_table_name text; v_dst_active boolean; v_field text; v_insert_refresh_config text; v_key_type text; v_max_timestamp timestamptz; v_old_search_path text; v_pk_name text[] := p_pk_name; v_pk_type text[] := p_pk_type; v_remote_key_sql text; v_update_refresh_config text; BEGIN SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_dblink_schema||',public'',''false'')'; IF (p_pk_name IS NULL AND p_pk_type IS NOT NULL) OR (p_pk_name IS NOT NULL AND p_pk_type IS NULL) THEN RAISE EXCEPTION 'Cannot manually set primary/unique key field(s) without defining type(s) or vice versa'; END IF; SELECT data_source INTO v_data_source FROM @extschema@.dblink_mapping WHERE data_source_id = p_dblink_id; IF NOT FOUND THEN RAISE EXCEPTION 'ERROR: database link ID is incorrect %', p_dblink_id; END IF; IF p_dest_table IS NULL THEN p_dest_table := p_src_table; END IF; IF position('.' in p_dest_table) > 0 AND position('.' in p_src_table) > 0 THEN v_dest_schema_name := split_part(p_dest_table, '.', 1); v_dest_table_name := split_part(p_dest_table, '.', 2); ELSE RAISE EXCEPTION 'Source (and destination) table must be schema qualified'; END IF; PERFORM dblink_connect('mimeo_updater', @extschema@.auth(p_dblink_id)); IF p_pk_name IS NULL AND p_pk_type IS NULL THEN -- Either gets the primary key or it gets the first unique index in alphabetical order by index name v_remote_key_sql := 'SELECT CASE WHEN i.indisprimary IS true THEN ''primary'' WHEN i.indisunique IS true THEN ''unique'' END AS key_type, ( SELECT array_agg( a.attname ORDER by x.r ) FROM pg_attribute a JOIN ( SELECT k, row_number() over () as r FROM unnest(i.indkey) k ) as x ON a.attnum = x.k AND a.attrelid = i.indrelid WHERE a.attnotnull ) AS indkey_names, ( SELECT array_agg( a.atttypid::regtype::text ORDER by x.r ) FROM pg_attribute a JOIN ( SELECT k, row_number() over () as r FROM unnest(i.indkey) k ) as x ON a.attnum = x.k AND a.attrelid = i.indrelid WHERE a.attnotnull ) AS indkey_types FROM pg_index i WHERE i.indrelid = '||quote_literal(p_src_table)||'::regclass AND (i.indisprimary OR i.indisunique) ORDER BY key_type LIMIT 1'; EXECUTE 'SELECT key_type, indkey_names, indkey_types FROM dblink(''mimeo_updater'', '||quote_literal(v_remote_key_sql)||') t (key_type text, indkey_names text[], indkey_types text[])' INTO v_key_type, v_pk_name, v_pk_type; END IF; IF v_pk_name IS NULL OR v_pk_type IS NULL THEN RAISE EXCEPTION 'Source table has no valid primary key or unique index'; END IF; IF p_filter IS NOT NULL THEN FOREACH v_field IN ARRAY v_pk_name LOOP IF v_field = ANY(p_filter) THEN CONTINUE; ELSE RAISE EXCEPTION 'ERROR: filter list did not contain all columns that compose primary/unique key for source table %', p_src_table; END IF; END LOOP; END IF; -- Only create destination table if it doesn't already exist SELECT schemaname||'.'||tablename INTO v_dest_check FROM pg_tables WHERE schemaname = v_dest_schema_name AND tablename = v_dest_table_name; IF v_dest_check IS NULL THEN v_insert_refresh_config := 'INSERT INTO @extschema@.refresh_config_snap(source_table, dest_table, dblink, filter, condition) VALUES(' ||quote_literal(p_src_table)||', '||quote_literal(p_dest_table)||', '||p_dblink_id||',' ||COALESCE(quote_literal(p_filter), 'NULL')||','||COALESCE(quote_literal(p_condition), 'NULL')||')'; RAISE NOTICE 'Snapshotting source table to pull all current source data...'; EXECUTE v_insert_refresh_config; EXECUTE 'SELECT @extschema@.refresh_snap('||quote_literal(p_dest_table)||', p_index := '||p_index||', p_pulldata := '||p_pulldata||')'; PERFORM @extschema@.snapshot_destroyer(p_dest_table, 'ARCHIVE'); -- Ensure destination indexes that are needed for efficient replication are created even if p_index is set false IF p_index = false THEN RAISE NOTICE 'Adding primary/unique key to table...'; IF v_key_type = 'primary' THEN EXECUTE 'ALTER TABLE '||p_dest_table||' ADD PRIMARY KEY('||array_to_string(v_pk_name, ',')||')'; ELSE EXECUTE 'CREATE UNIQUE INDEX ON '||p_dest_table||' ('||array_to_string(v_pk_name, ',')||')'; END IF; END IF; ELSE RAISE NOTICE 'Destination table % already exists. No data or indexes was pulled from source', p_dest_table; END IF; PERFORM dblink_disconnect('mimeo_updater'); RAISE NOTICE 'Getting the maximum destination timestamp...'; EXECUTE 'SELECT max('||p_control_field||') FROM '||p_dest_table||';' INTO v_max_timestamp; v_dst_active := @extschema@.dst_utc_check(); v_insert_refresh_config := 'INSERT INTO @extschema@.refresh_config_updater(source_table, dest_table, dblink, control, boundary, pk_name, pk_type, last_value, last_run, dst_active, filter, condition) VALUES(' ||quote_literal(p_src_table)||', '||quote_literal(p_dest_table)||', '|| p_dblink_id||', '||quote_literal(p_control_field)||', ''' ||p_boundary||'''::interval, '||quote_literal(v_pk_name)||', '||quote_literal(v_pk_type)||', ' ||quote_literal(COALESCE(v_max_timestamp, CURRENT_TIMESTAMP))||','||quote_literal(CURRENT_TIMESTAMP)||','||v_dst_active||',' ||COALESCE(quote_literal(p_filter), 'NULL')||','||COALESCE(quote_literal(p_condition), 'NULL')||')'; RAISE NOTICE 'Inserting data into config table'; EXECUTE v_insert_refresh_config; -- Remove temp snap from config EXECUTE 'DELETE FROM @extschema@.refresh_config_snap WHERE source_table = '||quote_literal(p_src_table)||' AND dest_table = '||quote_literal(p_dest_table); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RAISE NOTICE 'Done'; RETURN; EXCEPTION WHEN OTHERS THEN EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_dblink_schema||''',''false'')'; IF dblink_get_connections() @> '{mimeo_updater}' THEN PERFORM dblink_disconnect('mimeo_updater'); END IF; EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RAISE EXCEPTION '%', SQLERRM; END $$;