CREATE FUNCTION @extschema@.undo_partition( p_parent_table text , p_target_table text , p_loop_count int DEFAULT 1 , p_batch_interval text DEFAULT NULL , p_keep_table boolean DEFAULT true , p_lock_wait numeric DEFAULT 0 , p_ignored_columns text[] DEFAULT NULL , p_drop_cascade boolean DEFAULT false , OUT partitions_undone int , OUT rows_undone bigint ) RETURNS record LANGUAGE plpgsql AS $$ DECLARE ex_context text; ex_detail text; ex_hint text; ex_message text; v_adv_lock boolean; v_batch_interval_id bigint; v_batch_interval_time interval; v_batch_loop_count int := 0; v_child_loop_total bigint := 0; v_child_table text; v_column_list text; v_control text; v_control_type text; v_time_encoder text; v_time_decoder text; v_child_min_id bigint; v_child_min_time timestamptz; v_epoch text; v_function_name text; v_jobmon boolean; v_jobmon_schema text; v_job_id bigint; v_inner_loop_count int; v_lock_iter int := 1; v_lock_obtained boolean := FALSE; v_new_search_path text; v_old_search_path text; v_parent_schema text; v_parent_tablename text; v_partition_expression text; v_partition_interval text; v_row record; v_rowcount bigint; v_sql text; v_step_id bigint; v_sub_count int; v_target_schema text; v_target_tablename text; v_template_schema text; v_template_siblings int; v_template_table text; v_template_tablename text; v_total bigint := 0; v_trig_name text; v_undo_count int := 0; BEGIN /* * Moves data to new, target table since data cannot be moved elsewhere in the same partition set. * Leaves old parent table as is and does not change name of new table. */ v_adv_lock := pg_try_advisory_xact_lock(hashtext('pg_partman undo_partition')); IF v_adv_lock = 'false' THEN RAISE NOTICE 'undo_partition already running.'; partitions_undone = -1; RETURN; END IF; IF p_parent_table = p_target_table THEN RAISE EXCEPTION 'Target table cannot be the same as the parent table'; END IF; SELECT partition_interval::text , control , time_encoder , time_decoder , jobmon , epoch , template_table INTO v_partition_interval , v_control , v_time_encoder , v_time_decoder , v_jobmon , v_epoch , v_template_table FROM @extschema@.part_config WHERE parent_table = p_parent_table; IF v_control IS NULL THEN RAISE EXCEPTION 'No configuration found for pg_partman for given parent table: %', p_parent_table; END IF; IF p_target_table IS NULL THEN RAISE EXCEPTION 'The p_target_table option must be set when undoing a partitioned table'; END IF; SELECT n.nspname, c.relname INTO v_parent_schema, v_parent_tablename FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = split_part(p_parent_table, '.', 1)::name AND c.relname = split_part(p_parent_table, '.', 2)::name; IF v_parent_tablename IS NULL THEN RAISE EXCEPTION 'Given parent table not found in system catalogs: %', p_parent_table; END IF; SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_parent_schema, v_parent_tablename, v_control); IF v_control_type IN ('time', 'text', 'uuid') OR (v_control_type = 'id' AND v_epoch <> 'none') THEN IF p_batch_interval IS NULL THEN v_batch_interval_time := v_partition_interval::interval; ELSE v_batch_interval_time := p_batch_interval::interval; END IF; ELSIF v_control_type = 'id' THEN IF p_batch_interval IS NULL THEN v_batch_interval_id := v_partition_interval::bigint; ELSE v_batch_interval_id := p_batch_interval::bigint; END IF; ELSE RAISE EXCEPTION 'Data type of control column in given partition set must be either date/time or integer.'; END IF; SELECT current_setting('search_path') INTO v_old_search_path; IF length(v_old_search_path) > 0 THEN v_new_search_path := '@extschema@,pg_temp,'||v_old_search_path; ELSE v_new_search_path := '@extschema@,pg_temp'; END IF; IF v_jobmon THEN SELECT nspname INTO v_jobmon_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_jobmon'::name AND e.extnamespace = n.oid; IF v_jobmon_schema IS NOT NULL THEN v_new_search_path := format('%s,%s',v_jobmon_schema, v_new_search_path); END IF; END IF; EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_new_search_path, 'false'); -- Check if any child tables are themselves partitioned or part of an inheritance tree. Prevent undo at this level if so. -- Need to lock child tables at all levels before multi-level undo can be performed safely. FOR v_row IN SELECT partition_schemaname, partition_tablename FROM @extschema@.show_partitions(p_parent_table) LOOP SELECT count(*) INTO v_sub_count FROM pg_catalog.pg_inherits i JOIN pg_catalog.pg_class c ON i.inhparent = c.oid JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE c.relname = v_row.partition_tablename::name AND n.nspname = v_row.partition_schemaname::name; IF v_sub_count > 0 THEN RAISE EXCEPTION 'Child table for this parent has child table(s) itself (%). Run undo partitioning on this table to ensure all data is properly moved to target table', v_row.partition_schemaname||'.'||v_row.partition_tablename; END IF; END LOOP; SELECT n.nspname, c.relname INTO v_target_schema, v_target_tablename FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = split_part(p_target_table, '.', 1)::name AND c.relname = split_part(p_target_table, '.', 2)::name; IF v_target_tablename IS NULL THEN RAISE EXCEPTION 'Given target table not found in system catalogs: %', p_target_table; END IF; IF v_jobmon_schema IS NOT NULL THEN v_job_id := add_job(format('PARTMAN UNDO PARTITIONING: %s', p_parent_table)); v_step_id := add_step(v_job_id, format('Undoing partitioning for table %s', p_parent_table)); END IF; v_partition_expression := CASE WHEN v_epoch = 'seconds' THEN format('to_timestamp(%I)', v_control) WHEN v_epoch = 'milliseconds' THEN format('to_timestamp((%I/1000)::float)', v_control) WHEN v_epoch = 'microseconds' THEN format('to_timestamp((%I/1000000)::float)', v_control) WHEN v_epoch = 'nanoseconds' THEN format('to_timestamp((%I/1000000000)::float)', v_control) ELSE format('%I', v_control) END; -- Stops new time partitions from being made as well as stopping child tables from being dropped if they were configured with a retention period. UPDATE @extschema@.part_config SET undo_in_progress = true WHERE parent_table = p_parent_table; IF v_jobmon_schema IS NOT NULL THEN IF (v_trig_name IS NOT NULL OR v_function_name IS NOT NULL) THEN PERFORM update_step(v_step_id, 'OK', 'Stopped partition creation process. Removed trigger & trigger function'); ELSE PERFORM update_step(v_step_id, 'OK', 'Stopped partition creation process.'); END IF; END IF; -- Generate column list to use in SELECT/INSERT statements below. Allows for exclusion of GENERATED (or any other desired) columns. SELECT string_agg(quote_ident(attname), ',') INTO v_column_list FROM pg_catalog.pg_attribute a JOIN pg_catalog.pg_class c ON a.attrelid = c.oid JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = v_target_schema AND c.relname = v_target_tablename AND a.attnum > 0 AND a.attisdropped = false AND attname <> ALL(COALESCE(p_ignored_columns, ARRAY[]::text[])); <> LOOP -- Get ordered list of child table in set. Store in variable one at a time per loop until none are left or batch count is reached. -- This easily allows it to loop over same child table until empty or move onto next child table after it's dropped -- Include the default table to ensure all data there is removed as well SELECT partition_tablename INTO v_child_table FROM @extschema@.show_partitions(p_parent_table, 'ASC', p_include_default := TRUE) LIMIT 1; EXIT outer_child_loop WHEN v_child_table IS NULL; IF v_jobmon_schema IS NOT NULL THEN v_step_id := add_step(v_job_id, format('Removing child partition: %s.%s', v_parent_schema, v_child_table)); END IF; IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time; ELSIF (v_control_type IN ('text', 'uuid')) THEN --- This can pass NULL to decoder function EXECUTE format('SELECT %s((SELECT min(%s::text) FROM %I.%I))', v_time_decoder, v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time; ELSIF v_control_type = 'id' THEN EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_id; END IF; IF v_child_min_time IS NULL AND v_child_min_id IS NULL THEN -- No rows left in this child table. Remove from partition set. -- lockwait timeout for table drop IF p_lock_wait > 0 THEN v_lock_iter := 0; WHILE v_lock_iter <= 5 LOOP v_lock_iter := v_lock_iter + 1; BEGIN EXECUTE format('LOCK TABLE ONLY %I.%I IN ACCESS EXCLUSIVE MODE NOWAIT', v_parent_schema, v_child_table); v_lock_obtained := TRUE; EXCEPTION WHEN lock_not_available THEN PERFORM pg_sleep( p_lock_wait / 5.0 ); CONTINUE; END; EXIT WHEN v_lock_obtained; END LOOP; IF NOT v_lock_obtained THEN RAISE NOTICE 'Unable to obtain lock on child table for removal from partition set'; partitions_undone = -1; RETURN; END IF; END IF; -- END p_lock_wait IF v_lock_obtained := FALSE; -- reset for reuse later v_sql := format('ALTER TABLE %I.%I DETACH PARTITION %I.%I' , v_parent_schema , v_parent_tablename , v_parent_schema , v_child_table); EXECUTE v_sql; IF p_keep_table = false THEN v_sql := 'DROP TABLE %I.%I'; IF p_drop_cascade THEN v_sql := v_sql || ' CASCADE'; END IF; EXECUTE format(v_sql, v_parent_schema, v_child_table); IF v_jobmon_schema IS NOT NULL THEN PERFORM update_step(v_step_id, 'OK', format('Child table DROPPED. Moved %s rows to target table', v_child_loop_total)); END IF; ELSE IF v_jobmon_schema IS NOT NULL THEN PERFORM update_step(v_step_id, 'OK', format('Child table DETACHED/UNINHERITED from parent, not DROPPED. Moved %s rows to target table', v_child_loop_total)); END IF; END IF; v_undo_count := v_undo_count + 1; EXIT outer_child_loop WHEN v_batch_loop_count >= p_loop_count; -- Exit outer FOR loop if p_loop_count is reached CONTINUE outer_child_loop; -- skip data moving steps below END IF; v_inner_loop_count := 1; v_child_loop_total := 0; <> LOOP IF v_control_type IN ('time', 'text', 'uuid') OR (v_control_type = 'id' AND v_epoch <> 'none') THEN -- do some locking with timeout, if required IF p_lock_wait > 0 THEN v_lock_iter := 0; WHILE v_lock_iter <= 5 LOOP v_lock_iter := v_lock_iter + 1; BEGIN IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %L FOR UPDATE NOWAIT' , v_parent_schema , v_child_table , v_control , v_child_min_time + (v_batch_interval_time * v_inner_loop_count)); ELSIF (v_control_type IN ('text', 'uuid')) THEN EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %s(%L) FOR UPDATE NOWAIT' , v_parent_schema , v_child_table , v_control , v_time_encoder , v_child_min_time + (v_batch_interval_time * v_inner_loop_count)); END IF; v_lock_obtained := TRUE; EXCEPTION WHEN lock_not_available THEN PERFORM pg_sleep( p_lock_wait / 5.0 ); CONTINUE; END; EXIT WHEN v_lock_obtained; END LOOP; IF NOT v_lock_obtained THEN RAISE NOTICE 'Unable to obtain lock on batch of rows to move'; partitions_undone = -1; RETURN; END IF; END IF; -- Get everything from the current child minimum up to the multiples of the given interval IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN EXECUTE format('WITH move_data AS ( DELETE FROM %I.%I WHERE %s <= %L RETURNING %s ) INSERT INTO %I.%I (%5$s) SELECT %5$s FROM move_data' , v_parent_schema , v_child_table , v_partition_expression , v_child_min_time + (v_batch_interval_time * v_inner_loop_count) , v_column_list , v_target_schema , v_target_tablename); ELSIF (v_control_type IN ('text', 'uuid')) THEN EXECUTE format('WITH move_data AS ( DELETE FROM %I.%I WHERE %s <= %s(%L) RETURNING %s ) INSERT INTO %I.%I (%6$s) SELECT %6$s FROM move_data' , v_parent_schema , v_child_table , v_partition_expression , v_time_encoder , v_child_min_time + (v_batch_interval_time * v_inner_loop_count) , v_column_list , v_target_schema , v_target_tablename); END IF; GET DIAGNOSTICS v_rowcount = ROW_COUNT; v_total := v_total + v_rowcount; v_child_loop_total := v_child_loop_total + v_rowcount; IF v_jobmon_schema IS NOT NULL THEN PERFORM update_step(v_step_id, 'OK', format('Moved %s rows to target table.', v_child_loop_total)); END IF; EXIT inner_child_loop WHEN v_rowcount = 0; -- exit before loop incr if table is empty v_inner_loop_count := v_inner_loop_count + 1; v_batch_loop_count := v_batch_loop_count + 1; -- Check again if table is empty and go to outer loop again to drop it if so IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time; ELSIF (v_control_type IN ('text', 'uuid')) THEN EXECUTE format('SELECT %s((SELECT min(%s::text) FROM %I.%I))', v_time_decoder, v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time; END IF; CONTINUE outer_child_loop WHEN v_child_min_time IS NULL; ELSIF v_control_type = 'id' THEN IF p_lock_wait > 0 THEN v_lock_iter := 0; WHILE v_lock_iter <= 5 LOOP v_lock_iter := v_lock_iter + 1; BEGIN EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %L FOR UPDATE NOWAIT' , v_parent_schema , v_child_table , v_control , v_child_min_id + (v_batch_interval_id * v_inner_loop_count)); v_lock_obtained := TRUE; EXCEPTION WHEN lock_not_available THEN PERFORM pg_sleep( p_lock_wait / 5.0 ); CONTINUE; END; EXIT WHEN v_lock_obtained; END LOOP; IF NOT v_lock_obtained THEN RAISE NOTICE 'Unable to obtain lock on batch of rows to move'; partitions_undone = -1; RETURN; END IF; END IF; -- Get everything from the current child minimum up to the multiples of the given interval EXECUTE format('WITH move_data AS ( DELETE FROM %I.%I WHERE %s <= %L RETURNING %s) INSERT INTO %I.%I (%5$s) SELECT %5$s FROM move_data' , v_parent_schema , v_child_table , v_partition_expression , v_child_min_id + (v_batch_interval_id * v_inner_loop_count) , v_column_list , v_target_schema , v_target_tablename); GET DIAGNOSTICS v_rowcount = ROW_COUNT; v_total := v_total + v_rowcount; v_child_loop_total := v_child_loop_total + v_rowcount; IF v_jobmon_schema IS NOT NULL THEN PERFORM update_step(v_step_id, 'OK', format('Moved %s rows to target table.', v_child_loop_total)); END IF; EXIT inner_child_loop WHEN v_rowcount = 0; -- exit before loop incr if table is empty v_inner_loop_count := v_inner_loop_count + 1; v_batch_loop_count := v_batch_loop_count + 1; -- Check again if table is empty and go to outer loop again to drop it if so EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_id; CONTINUE outer_child_loop WHEN v_child_min_id IS NULL; END IF; -- end v_control_type check EXIT outer_child_loop WHEN v_batch_loop_count >= p_loop_count; -- Exit outer FOR loop if p_loop_count is reached END LOOP inner_child_loop; END LOOP outer_child_loop; SELECT partition_tablename INTO v_child_table FROM @extschema@.show_partitions(p_parent_table, 'ASC', TRUE) LIMIT 1; IF v_child_table IS NULL THEN DELETE FROM @extschema@.part_config WHERE parent_table = p_parent_table; -- Check if any other config entries still have this template table and don't remove if so -- Allows other sibling/parent tables to still keep using in case entire partition set isn't being undone SELECT count(*) INTO v_template_siblings FROM @extschema@.part_config WHERE template_table = v_template_table; SELECT n.nspname, c.relname INTO v_template_schema, v_template_tablename FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = split_part(v_template_table, '.', 1)::name AND c.relname = split_part(v_template_table, '.', 2)::name; IF v_template_siblings = 0 AND v_template_tablename IS NOT NULL THEN EXECUTE format('DROP TABLE IF EXISTS %I.%I', v_template_schema, v_template_tablename); END IF; IF v_jobmon_schema IS NOT NULL THEN v_step_id := add_step(v_job_id, 'Removing config from pg_partman'); PERFORM update_step(v_step_id, 'OK', 'Done'); END IF; END IF; RAISE NOTICE 'Moved % row(s) to the target table. Removed % partitions.', v_total, v_undo_count; IF v_jobmon_schema IS NOT NULL THEN v_step_id := add_step(v_job_id, 'Final stats'); PERFORM update_step(v_step_id, 'OK', format('Moved %s row(s) to the target table. Removed %s partitions.', v_total, v_undo_count)); END IF; IF v_jobmon_schema IS NOT NULL THEN PERFORM close_job(v_job_id); END IF; EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false'); partitions_undone := v_undo_count; rows_undone := v_total; EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS ex_message = MESSAGE_TEXT, ex_context = PG_EXCEPTION_CONTEXT, ex_detail = PG_EXCEPTION_DETAIL, ex_hint = PG_EXCEPTION_HINT; IF v_jobmon_schema IS NOT NULL THEN IF v_job_id IS NULL THEN EXECUTE format('SELECT %I.add_job(''PARTMAN UNDO PARTITIONING: %s'')', v_jobmon_schema, p_parent_table) INTO v_job_id; EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before job logging started'')', v_jobmon_schema, v_job_id, p_parent_table) INTO v_step_id; ELSIF v_step_id IS NULL THEN EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before first step logged'')', v_jobmon_schema, v_job_id) INTO v_step_id; END IF; EXECUTE format('SELECT %I.update_step(%s, ''CRITICAL'', %L)', v_jobmon_schema, v_step_id, 'ERROR: '||coalesce(SQLERRM,'unknown')); EXECUTE format('SELECT %I.fail_job(%s)', v_jobmon_schema, v_job_id); END IF; RAISE EXCEPTION '% CONTEXT: % DETAIL: % HINT: %', ex_message, ex_context, ex_detail, ex_hint; END $$;