/* * !!!!!! READ THIS FIRST !!!!!! * Alternate function to provide a way to use a pre-9.0 version of PostgreSQL as the source. * It is not installed as part of the extension so can be safely added and removed without affecting it if you don't rename the function to its original name. * You must do a find-and-replace to set the proper schema that mimeo is installed to on the destination database. * I left "@extschema@" in here from the original extension code to provide an easy string to find and replace. * Just search for that and replace with your installation's schema. */ CREATE OR REPLACE FUNCTION @extschema@.refresh_snap_pre90(p_destination text, p_index boolean DEFAULT true, p_debug boolean DEFAULT false, p_pulldata boolean DEFAULT true) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE v_adv_lock boolean; v_cols_n_types text[]; v_cols text[]; v_condition text; v_create_sql text; v_dblink int; v_dblink_name text; v_dblink_schema text; v_dest_table text; v_ex_context text; v_exists int; v_fetch_sql text; v_filter text[]; v_insert_sql text; v_job_id int; v_jobmon_schema text; v_job_name text; v_lcols_array text[]; v_local_sql text; v_l text; v_match boolean := 'f'; v_old_search_path text; v_parts record; v_post_script text[]; v_refresh_snap text; v_remote_index_sql text; v_remote_sql text; v_row record; v_rowcount bigint; v_r text; v_snap text; v_source_table text; v_step_id int; v_table_exists int; v_total bigint := 0; v_view_definition text; BEGIN IF p_debug IS DISTINCT FROM true THEN PERFORM set_config( 'client_min_messages', 'notice', true ); END IF; v_job_name := 'Refresh Snap: '||p_destination; v_dblink_name := 'mimeo_snap_refresh_'||p_destination; SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid; SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; -- Set custom search path to allow easier calls to other functions, especially job logging SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||',public'',''false'')'; v_job_id := add_job(v_job_name); PERFORM gdb(p_debug,'Job ID: '||v_job_id::text); -- Take advisory lock to prevent multiple calls to function overlapping and causing possible deadlock v_adv_lock := pg_try_advisory_lock(hashtext('refresh_snap'), hashtext(v_job_name)); IF v_adv_lock = 'false' THEN v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name); PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name); PERFORM update_step(v_step_id, 'WARNING','Found concurrent job. Exiting gracefully'); PERFORM fail_job(v_job_id, 2); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; RETURN; END IF; v_step_id := add_step(v_job_id,'Grabbing Mapping, Building SQL'); SELECT source_table , dest_table , dblink , filter , condition , post_script INTO v_source_table , v_dest_table , v_dblink , v_filter , v_condition , v_post_script FROM refresh_config_snap WHERE dest_table = p_destination; IF NOT FOUND THEN RAISE EXCEPTION 'ERROR: This table is not set up for snapshot replication: %',v_job_name; END IF; -- checking for current view SELECT definition INTO v_view_definition FROM pg_views where ((schemaname || '.') || viewname)=v_dest_table; PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink)); v_remote_sql := 'SELECT array_agg(attname) as cols, array_agg(attname||'' ''||format_type(atttypid, atttypmod)::text) as cols_n_types FROM pg_attribute WHERE attrelid = '||quote_literal(v_source_table)||'::regclass AND attnum > 0 AND attisdropped is false'; -- Apply column filters if used IF v_filter IS NOT NULL THEN v_remote_sql := v_remote_sql || ' AND ARRAY[attname::text] <@ '||quote_literal(v_filter); END IF; v_remote_sql := 'SELECT cols, cols_n_types FROM dblink('||quote_literal(v_dblink_name)||', ' || quote_literal(v_remote_sql) || ') AS t (cols text[], cols_n_types text[])'; perform gdb(p_debug,'v_remote_sql: '||v_remote_sql); EXECUTE v_remote_sql INTO v_cols, v_cols_n_types; perform gdb(p_debug,'v_cols: {'|| array_to_string(v_cols, ',') ||'}'); perform gdb(p_debug,'v_cols_n_types: {'|| array_to_string(v_cols_n_types, ',') ||'}'); PERFORM update_step(v_step_id, 'OK','Done'); v_step_id := add_step(v_job_id,'Truncate non-active snap table'); v_exists := strpos(v_view_definition, 'snap1'); IF v_exists > 0 THEN v_snap := '_snap2'; ELSE v_snap := '_snap1'; END IF; v_refresh_snap := v_dest_table||v_snap; PERFORM gdb(p_debug,'v_refresh_snap: '||v_refresh_snap::text); -- Create snap table if it doesn't exist SELECT string_to_array(v_refresh_snap, '.') AS oparts INTO v_parts; SELECT INTO v_table_exists count(1) FROM pg_tables WHERE schemaname = v_parts.oparts[1] AND tablename = v_parts.oparts[2]; IF v_table_exists = 0 THEN PERFORM gdb(p_debug,'Snap table does not exist. Creating... '); v_create_sql := 'CREATE TABLE ' || v_refresh_snap || ' (' || array_to_string(v_cols_n_types, ',') || ')'; perform gdb(p_debug,'v_create_sql: '||v_create_sql::text); EXECUTE v_create_sql; ELSE /* Check local column definitions against remote and recreate table if different. Allows automatic recreation of snap tables if columns change (add, drop type change) */ v_local_sql := 'SELECT array_agg(attname||'' ''||format_type(atttypid, atttypmod)::text) as cols_n_types FROM pg_attribute WHERE attnum > 0 AND attisdropped is false AND attrelid = ' || quote_literal(v_refresh_snap) || '::regclass'; PERFORM gdb(p_debug, v_local_sql); EXECUTE v_local_sql INTO v_lcols_array; -- Check to see if there's a change in the column structure on the remote FOREACH v_r IN ARRAY v_cols_n_types LOOP v_match := 'f'; FOREACH v_l IN ARRAY v_lcols_array LOOP IF v_r = v_l THEN v_match := 't'; EXIT; END IF; END LOOP; END LOOP; IF v_match = 'f' THEN EXECUTE 'DROP TABLE ' || v_refresh_snap; EXECUTE 'DROP VIEW ' || v_dest_table; v_create_sql := 'CREATE TABLE ' || v_refresh_snap || ' (' || array_to_string(v_cols_n_types, ',') || ')'; PERFORM gdb(p_debug,'v_create_sql: '||v_create_sql::text); EXECUTE v_create_sql; v_step_id := add_step(v_job_id,'Source table structure changed.'); PERFORM update_step(v_step_id, 'OK','Tables and view dropped and recreated. Please double-check snap table attributes (permissions, indexes, etc'); PERFORM gdb(p_debug,'Source table structure changed. Tables and view dropped and recreated. Please double-check snap table attributes (permissions, indexes, etc)'); END IF; -- truncate non-active snap table EXECUTE 'TRUNCATE TABLE ' || v_refresh_snap; PERFORM update_step(v_step_id, 'OK','Done'); END IF; v_remote_sql := 'SELECT '|| array_to_string(v_cols, ',') ||' FROM '||v_source_table; -- Used by p_pull options in all maker functions to be able to create a replication job but pull no data IF p_pulldata = false THEN v_remote_sql := v_remote_sql || ' LIMIT 0'; ELSIF v_condition IS NOT NULL THEN v_remote_sql := v_remote_sql || ' ' || v_condition; END IF; v_step_id := add_step(v_job_id,'Inserting records into local table'); PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_sql); v_rowcount := 0; LOOP v_fetch_sql := 'INSERT INTO '|| v_refresh_snap ||' ('|| array_to_string(v_cols, ',') ||') SELECT '||array_to_string(v_cols, ',')||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||array_to_string(v_cols_n_types, ',')||')'; EXECUTE v_fetch_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; EXIT WHEN v_rowcount = 0; v_total := v_total + coalesce(v_rowcount, 0); PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.'); PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.'); END LOOP; PERFORM dblink_close(v_dblink_name, 'mimeo_cursor'); PERFORM update_step(v_step_id, 'OK','Inserted '||v_total||' rows'); -- Create indexes if new table was created IF (v_table_exists = 0 OR v_match = 'f') AND p_index = true THEN v_remote_index_sql := 'SELECT CASE WHEN i.indisprimary IS true THEN ''primary'' WHEN i.indisunique IS true THEN ''unique'' ELSE ''index'' END AS key_type, array( SELECT a.attname FROM pg_attribute a WHERE a.attrelid = i.indrelid'; IF v_filter IS NOT NULL THEN v_remote_index_sql := v_remote_index_sql || ' AND ARRAY[a.attname::text] <@ '||quote_literal(v_filter); END IF; v_remote_index_sql := v_remote_index_sql || ' AND '',''||array_to_string(i.indkey::int2[], '','')||'','' like ''%,'' || a.attnum || '',%'' ORDER BY strpos( '',''||array_to_string(i.indkey::int2[], '','')||'','', '',''|| a.attnum || '','') ) AS indkey_names FROM pg_index i WHERE i.indrelid = '||quote_literal(v_source_table)||'::regclass;'; FOR v_row IN EXECUTE 'SELECT key_type, indkey_names FROM dblink('||quote_literal(v_dblink_name)||', '||quote_literal(v_remote_index_sql)||') t (key_type text, indkey_names text[])' LOOP IF (array_upper(v_row.indkey_names, 1) IS NOT NULL) THEN -- If column filter is used, indkey_name column may be null IF v_row.key_type = 'primary' THEN RAISE NOTICE 'Creating primary key...'; EXECUTE 'ALTER TABLE '||v_refresh_snap||' ADD CONSTRAINT '||v_parts.oparts[2]||'_'||array_to_string(v_row.indkey_names, '_')||'_idx PRIMARY KEY ('||array_to_string(v_row.indkey_names, ',')||')'; ELSIF v_row.key_type = 'unique' THEN RAISE NOTICE 'Creating unique index...'; EXECUTE 'CREATE UNIQUE INDEX '||v_parts.oparts[2]||'_'||array_to_string(v_row.indkey_names, '_')||'_idx ON '||v_refresh_snap|| '('||array_to_string(v_row.indkey_names, ',')||')'; ELSE RAISE NOTICE 'Creating index...'; EXECUTE 'CREATE INDEX '||v_parts.oparts[2]||'_'||array_to_string(v_row.indkey_names, '_')||'_idx ON '||v_refresh_snap|| '('||array_to_string(v_row.indkey_names, ',')||')'; END IF; END IF; END LOOP; END IF; IF v_rowcount IS NOT NULL THEN EXECUTE 'ANALYZE ' ||v_refresh_snap; --SET statement_timeout='30 min'; -- swap view v_step_id := add_step(v_job_id,'Swap view to '||v_refresh_snap); PERFORM gdb(p_debug,'Swapping view to '||v_refresh_snap); EXECUTE 'CREATE OR REPLACE VIEW '||v_dest_table||' AS SELECT * FROM '||v_refresh_snap; PERFORM update_step(v_step_id, 'OK','View Swapped'); v_step_id := add_step(v_job_id,'Updating last value'); UPDATE refresh_config_snap set last_run = CURRENT_TIMESTAMP WHERE dest_table = p_destination; PERFORM update_step(v_step_id, 'OK','Done'); -- Runs special sql to fix indexes, permissions, etc on recreated objects IF v_match = 'f' AND v_post_script IS NOT NULL THEN v_step_id := add_step(v_job_id,'Applying post_script sql commands due to schema change'); PERFORM @extschema@.post_script(v_dest_table); PERFORM update_step(v_step_id, 'OK','Done'); END IF; PERFORM close_job(v_job_id); ELSE RAISE EXCEPTION 'No rows found in source table'; END IF; PERFORM dblink_disconnect(v_dblink_name); -- Ensure old search path is reset for the current session EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); EXCEPTION WHEN QUERY_CANCELED THEN EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')'; IF dblink_get_connections() @> ARRAY[v_dblink_name] THEN PERFORM dblink_disconnect(v_dblink_name); END IF; EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); RAISE EXCEPTION '%', SQLERRM; WHEN OTHERS THEN EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')'; GET STACKED DIAGNOSTICS v_ex_context = PG_EXCEPTION_CONTEXT; IF v_job_id IS NULL THEN v_job_id := add_job('Refresh Snap: '||p_destination); v_step_id := add_step(v_job_id, 'EXCEPTION before job logging started'); END IF; IF v_step_id IS NULL THEN v_step_id := add_step(v_job_id, 'EXCEPTION before first step logged'); END IF; PERFORM update_step(v_step_id, 'CRITICAL', 'ERROR: '||coalesce(SQLERRM,'unknown')); PERFORM fail_job(v_job_id); IF dblink_get_connections() @> ARRAY[v_dblink_name] THEN PERFORM dblink_disconnect(v_dblink_name); END IF; EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; PERFORM pg_advisory_unlock(hashtext('refresh_snap'), hashtext(v_job_name)); RAISE EXCEPTION '% CONTEXT: %', SQLERRM, v_ex_context; END $$;