CREATE FUNCTION undo_partition_time(p_parent_table text, p_batch_count int DEFAULT 1, p_batch_interval interval DEFAULT NULL, p_keep_table boolean DEFAULT true, p_lock_wait numeric DEFAULT 0, OUT partitions_undone int, OUT rows_undone bigint) RETURNS record LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE ex_context text; ex_detail text; ex_hint text; ex_message text; v_adv_lock boolean; v_batch_loop_count int := 0; v_child_loop_total bigint := 0; v_child_min timestamptz; v_child_table text; v_control text; v_control_type text; v_epoch text; v_function_name text; v_inner_loop_count int; v_lock_iter int := 1; v_lock_obtained boolean := FALSE; v_job_id bigint; v_jobmon boolean; v_jobmon_schema text; v_new_search_path text := '@extschema@,pg_temp'; v_old_search_path text; v_parent_schema text; v_parent_tablename text; v_partition_expression text; v_partition_int_text text; v_partition_interval interval; v_row record; v_rowcount bigint; v_step_id bigint; v_sub_count int; v_total bigint := 0; v_trig_name text; v_type text; v_undo_count int := 0; BEGIN /* * Function to undo time-based partitioning created by this extension */ v_adv_lock := pg_try_advisory_xact_lock(hashtext('pg_partman undo_partition_time')); IF v_adv_lock = 'false' THEN RAISE NOTICE 'undo_partition_time already running.'; partitions_undone = -1; RETURN; END IF; SELECT partition_type , partition_interval , control , jobmon , epoch INTO v_type , v_partition_int_text , v_control , v_jobmon , v_epoch FROM @extschema@.part_config WHERE parent_table = p_parent_table AND partition_type IN ('partman', 'time-custom'); IF v_control IS NULL THEN RAISE EXCEPTION 'No non-native configuration found for pg_partman for given parent table: %', p_parent_table; END IF; SELECT schemaname, tablename INTO v_parent_schema, v_parent_tablename FROM pg_catalog.pg_tables WHERE schemaname = split_part(p_parent_table, '.', 1)::name AND tablename = split_part(p_parent_table, '.', 2)::name; SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_parent_schema, v_parent_tablename, v_control); IF v_control_type <> 'time' THEN IF (v_control_type = 'id' AND v_epoch = 'none') OR v_control_type <> 'id' THEN RAISE EXCEPTION 'Cannot run on partition set without time based control column or epoch flag set with an id column. Found control: %, epoch: %', v_control_type, v_epoch; END IF; END IF; v_partition_interval := v_partition_int_text::interval; SELECT current_setting('search_path') INTO v_old_search_path; IF v_jobmon THEN SELECT nspname INTO v_jobmon_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_jobmon'::name AND e.extnamespace = n.oid; IF v_jobmon_schema IS NOT NULL THEN v_new_search_path := '@extschema@,'||v_jobmon_schema||',pg_temp'; END IF; END IF; EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_new_search_path, 'false'); -- Check if any child tables are themselves partitioned or part of an inheritance tree. Prevent undo at this level if so. -- Need to either lock child tables at all levels or handle the proper removal of triggers on all child tables first -- before multi-level undo can be performed safely. FOR v_row IN SELECT partition_schemaname, partition_tablename FROM @extschema@.show_partitions(p_parent_table) LOOP SELECT count(*) INTO v_sub_count FROM pg_catalog.pg_inherits i JOIN pg_catalog.pg_class c ON i.inhparent = c.oid JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE c.relname = v_row.partition_tablename::name AND n.nspname = v_row.partition_schemaname::name; IF v_sub_count > 0 THEN RAISE EXCEPTION 'Child table for this parent has child table(s) itself (%). Run undo partitioning on this table or remove inheritance first to ensure all data is properly moved to parent', v_row.partition_schemaname||'.'||v_row.partition_tablename; END IF; END LOOP; IF v_jobmon_schema IS NOT NULL THEN v_job_id := add_job(format('PARTMAN UNDO PARTITIONING: %s', p_parent_table)); v_step_id := add_step(v_job_id, format('Undoing partitioning for table %s', p_parent_table)); END IF; IF p_batch_interval IS NULL THEN p_batch_interval := v_partition_interval; END IF; v_partition_expression := CASE WHEN v_epoch = 'seconds' THEN format('to_timestamp(%I)', v_control) WHEN v_epoch = 'milliseconds' THEN format('to_timestamp((%I/1000)::float)', v_control) ELSE format('%I', v_control) end; -- Stops new time partitons from being made as well as stopping child tables from being dropped if they were configured with a retention period. UPDATE @extschema@.part_config SET undo_in_progress = true WHERE parent_table = p_parent_table; -- Stop data going into child tables. v_trig_name := @extschema@.check_name_length(p_object_name := v_parent_tablename, p_suffix := '_part_trig'); v_function_name := @extschema@.check_name_length(v_parent_tablename, '_part_trig_func', FALSE); SELECT tgname INTO v_trig_name FROM pg_catalog.pg_trigger t JOIN pg_catalog.pg_class c ON t.tgrelid = c.oid WHERE tgname = v_trig_name::name AND c.relname = v_parent_tablename::name; SELECT proname INTO v_function_name FROM pg_catalog.pg_proc p JOIN pg_catalog.pg_namespace n ON p.pronamespace = n.oid WHERE n.nspname = v_parent_schema::name AND proname = v_function_name::name; IF v_trig_name IS NOT NULL THEN -- lockwait for trigger drop IF p_lock_wait > 0 THEN v_lock_iter := 0; WHILE v_lock_iter <= 5 LOOP v_lock_iter := v_lock_iter + 1; BEGIN EXECUTE format('LOCK TABLE ONLY %I.%I IN ACCESS EXCLUSIVE MODE NOWAIT', v_parent_schema, v_parent_tablename); v_lock_obtained := TRUE; EXCEPTION WHEN lock_not_available THEN PERFORM pg_sleep( p_lock_wait / 5.0 ); CONTINUE; END; EXIT WHEN v_lock_obtained; END LOOP; IF NOT v_lock_obtained THEN RAISE NOTICE 'Unable to obtain lock on parent table to remove trigger'; partitions_undone = -1; RETURN; END IF; END IF; -- END p_lock_wait IF EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I.%I', v_trig_name, v_parent_schema, v_parent_tablename); END IF; -- END trigger IF v_lock_obtained := FALSE; -- reset for reuse later IF v_function_name IS NOT NULL THEN EXECUTE format('DROP FUNCTION IF EXISTS %I.%I()', v_parent_schema, v_function_name); END IF; IF v_jobmon_schema IS NOT NULL THEN IF (v_trig_name IS NOT NULL OR v_function_name IS NOT NULL) THEN PERFORM update_step(v_step_id, 'OK', 'Stopped partition creation process. Removed trigger & trigger function'); ELSE PERFORM update_step(v_step_id, 'OK', 'Stopped partition creation process.'); END IF; END IF; <> LOOP -- Get ordered list of child table in set. Store in variable one at a time per loop until none are left or batch count is reached. -- This easily allows it to loop over same child table until empty or move onto next child table after it's dropped SELECT partition_tablename INTO v_child_table FROM @extschema@.show_partitions(p_parent_table, 'ASC') LIMIT 1; EXIT outer_child_loop WHEN v_child_table IS NULL; IF v_jobmon_schema IS NOT NULL THEN v_step_id := add_step(v_job_id, format('Removing child partition: %s.%s', v_parent_schema, v_child_table)); END IF; EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min; IF v_child_min IS NULL THEN -- No rows left in this child table. Remove from partition set. -- lockwait timeout for table drop IF p_lock_wait > 0 THEN v_lock_iter := 0; WHILE v_lock_iter <= 5 LOOP v_lock_iter := v_lock_iter + 1; BEGIN EXECUTE format('LOCK TABLE ONLY %I.%I IN ACCESS EXCLUSIVE MODE NOWAIT', v_parent_schema, v_child_table); v_lock_obtained := TRUE; EXCEPTION WHEN lock_not_available THEN PERFORM pg_sleep( p_lock_wait / 5.0 ); CONTINUE; END; EXIT WHEN v_lock_obtained; END LOOP; IF NOT v_lock_obtained THEN RAISE NOTICE 'Unable to obtain lock on child table for removal from partition set'; partitions_undone = -1; RETURN; END IF; END IF; -- END p_lock_wait IF v_lock_obtained := FALSE; -- reset for reuse later EXECUTE format('ALTER TABLE %I.%I NO INHERIT %I.%I' , v_parent_schema , v_child_table , v_parent_schema , v_parent_tablename); IF p_keep_table = false THEN EXECUTE format('DROP TABLE %I.%I', v_parent_schema, v_child_table); IF v_jobmon_schema IS NOT NULL THEN PERFORM update_step(v_step_id, 'OK', format('Child table DROPPED. Moved %s rows to parent', v_child_loop_total)); END IF; ELSE IF v_jobmon_schema IS NOT NULL THEN PERFORM update_step(v_step_id, 'OK', format('Child table UNINHERITED, not DROPPED. Moved %s rows to parent', v_child_loop_total)); END IF; END IF; IF v_type = 'time-custom' THEN DELETE FROM @extschema@.custom_time_partitions WHERE parent_table = p_parent_table AND child_table = v_parent_schema||'.'||v_child_table; END IF; v_undo_count := v_undo_count + 1; EXIT outer_child_loop WHEN v_batch_loop_count >= p_batch_count; -- Exit outer FOR loop if p_batch_count is reached CONTINUE outer_child_loop; -- skip data moving steps below END IF; v_inner_loop_count := 1; v_child_loop_total := 0; <> LOOP -- do some locking with timeout, if required IF p_lock_wait > 0 THEN v_lock_iter := 0; WHILE v_lock_iter <= 5 LOOP v_lock_iter := v_lock_iter + 1; BEGIN EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %L FOR UPDATE NOWAIT' , v_parent_schema , v_child_table , v_control , v_child_min + (p_batch_interval * v_inner_loop_count)); v_lock_obtained := TRUE; EXCEPTION WHEN lock_not_available THEN PERFORM pg_sleep( p_lock_wait / 5.0 ); CONTINUE; END; EXIT WHEN v_lock_obtained; END LOOP; IF NOT v_lock_obtained THEN RAISE NOTICE 'Unable to obtain lock on batch of rows to move'; partitions_undone = -1; RETURN; END IF; END IF; -- Get everything from the current child minimum up to the multiples of the given interval EXECUTE format('WITH move_data AS ( DELETE FROM %I.%I WHERE %s <= %L RETURNING *) INSERT INTO %I.%I SELECT * FROM move_data' , v_parent_schema , v_child_table , v_partition_expression , v_child_min + (p_batch_interval * v_inner_loop_count) , v_parent_schema , v_parent_tablename); GET DIAGNOSTICS v_rowcount = ROW_COUNT; v_total := v_total + v_rowcount; v_child_loop_total := v_child_loop_total + v_rowcount; IF v_jobmon_schema IS NOT NULL THEN PERFORM update_step(v_step_id, 'OK', format('Moved %s rows to parent.', v_child_loop_total)); END IF; EXIT inner_child_loop WHEN v_rowcount = 0; -- exit before loop incr if table is empty v_inner_loop_count := v_inner_loop_count + 1; v_batch_loop_count := v_batch_loop_count + 1; -- Check again if table is empty and go to outer loop again to drop it if so EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min; CONTINUE outer_child_loop WHEN v_child_min IS NULL; EXIT outer_child_loop WHEN v_batch_loop_count >= p_batch_count; -- Exit outer FOR loop if p_batch_count is reached END LOOP inner_child_loop; END LOOP outer_child_loop; SELECT partition_tablename INTO v_child_table FROM @extschema@.show_partitions(p_parent_table, 'ASC') LIMIT 1; IF v_child_table IS NULL THEN DELETE FROM @extschema@.part_config WHERE parent_table = p_parent_table; DELETE FROM @extschema@.part_config_sub WHERE sub_parent = p_parent_table; IF v_jobmon_schema IS NOT NULL THEN v_step_id := add_step(v_job_id, 'Removing config from pg_partman'); PERFORM update_step(v_step_id, 'OK', 'Done'); END IF; END IF; RAISE NOTICE 'Copied % row(s) to the parent. Removed % partitions.', v_total, v_undo_count; IF v_jobmon_schema IS NOT NULL THEN v_step_id := add_step(v_job_id, 'Final stats'); PERFORM update_step(v_step_id, 'OK', format('Copied %s row(s) to the parent. Removed %s partitions.', v_total, v_undo_count)); END IF; IF v_jobmon_schema IS NOT NULL THEN PERFORM close_job(v_job_id); END IF; EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false'); partitions_undone := v_undo_count; rows_undone := v_total; EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS ex_message = MESSAGE_TEXT, ex_context = PG_EXCEPTION_CONTEXT, ex_detail = PG_EXCEPTION_DETAIL, ex_hint = PG_EXCEPTION_HINT; IF v_jobmon_schema IS NOT NULL THEN IF v_job_id IS NULL THEN EXECUTE format('SELECT %I.add_job(''PARTMAN UNDO PARTITIONING: %s'')', v_jobmon_schema, p_parent_table) INTO v_job_id; EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before job logging started'')', v_jobmon_schema, v_job_id, p_parent_table) INTO v_step_id; ELSIF v_step_id IS NULL THEN EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before first step logged'')', v_jobmon_schema, v_job_id) INTO v_step_id; END IF; EXECUTE format('SELECT %I.update_step(%s, ''CRITICAL'', %L)', v_jobmon_schema, v_step_id, 'ERROR: '||coalesce(SQLERRM,'unknown')); EXECUTE format('SELECT %I.fail_job(%s)', v_jobmon_schema, v_job_id); END IF; RAISE EXCEPTION '% CONTEXT: % DETAIL: % HINT: %', ex_message, ex_context, ex_detail, ex_hint; END $$;