CREATE OR REPLACE FUNCTION pgmq.convert_archive_partitioned( table_name TEXT, partition_interval TEXT DEFAULT '10000', retention_interval TEXT DEFAULT '100000', leading_partition INT DEFAULT 10 ) RETURNS void AS $$ DECLARE a_table_name TEXT := pgmq.format_table_name(table_name, 'a'); a_table_name_old TEXT := pgmq.format_table_name(table_name, 'a') || '_old'; qualified_a_table_name TEXT := format('pgmq.%I', a_table_name); partition_col TEXT; a_partition_col TEXT; BEGIN PERFORM c.relkind FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = a_table_name AND c.relkind = 'p'; IF FOUND THEN RAISE NOTICE 'Table %s is already partitioned', a_table_name; RETURN; END IF; PERFORM c.relkind FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = a_table_name AND c.relkind = 'r'; IF NOT FOUND THEN RAISE NOTICE 'Table %s does not exists', a_table_name; RETURN; END IF; SELECT pgmq._get_partition_col(partition_interval) INTO partition_col; -- For archive tables, use archived_at for time-based partitioning IF partition_col = 'enqueued_at' THEN a_partition_col := 'archived_at'; ELSE a_partition_col := partition_col; END IF; EXECUTE 'ALTER TABLE ' || qualified_a_table_name || ' RENAME TO ' || a_table_name_old; -- When partitioning by time (archived_at), we need to exclude constraints and indexes -- because the existing PRIMARY KEY on msg_id alone is incompatible with partitioning by archived_at. -- When partitioning by msg_id, we can keep all constraints including PRIMARY KEY. IF a_partition_col = 'archived_at' THEN EXECUTE format( 'CREATE TABLE pgmq.%I (LIKE pgmq.%I including defaults including generated including storage including comments) PARTITION BY RANGE (%I)', a_table_name, a_table_name_old, a_partition_col ); ELSE EXECUTE format( 'CREATE TABLE pgmq.%I (LIKE pgmq.%I including all) PARTITION BY RANGE (%I)', a_table_name, a_table_name_old, a_partition_col ); END IF; EXECUTE 'ALTER INDEX pgmq.archived_at_idx_' || table_name || ' RENAME TO archived_at_idx_' || table_name || '_old'; EXECUTE 'CREATE INDEX archived_at_idx_'|| table_name || ' ON ' || qualified_a_table_name ||'(archived_at)'; -- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md -- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema. EXECUTE FORMAT( $QUERY$ SELECT %I.create_parent( p_parent_table := %L, p_control := %L, p_interval := %L, p_type := case when pgmq._get_pg_partman_major_version() = 5 then 'range' else 'native' end ) $QUERY$, pgmq._get_pg_partman_schema(), qualified_a_table_name, a_partition_col, partition_interval ); EXECUTE FORMAT( $QUERY$ UPDATE %I.part_config SET retention = %L, retention_keep_table = false, retention_keep_index = false, infinite_time_partitions = true WHERE parent_table = %L; $QUERY$, pgmq._get_pg_partman_schema(), retention_interval, qualified_a_table_name ); END; $$ LANGUAGE plpgsql;