CREATE SCHEMA mx_alter_distributed_table; SET search_path TO mx_alter_distributed_table; SET citus.shard_replication_factor TO 1; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1410000; -- test alter_distributed_table UDF CREATE TABLE adt_table (a INT, b INT); CREATE TABLE adt_col (a INT UNIQUE, b INT); CREATE TABLE adt_ref (a INT REFERENCES adt_col(a)); SELECT create_distributed_table('adt_table', 'a', colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('adt_col', 'a', colocate_with:='adt_table'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('adt_ref', 'a', colocate_with:='adt_table'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO adt_table VALUES (1, 2), (3, 4), (5, 6); INSERT INTO adt_col VALUES (3, 4), (5, 6), (7, 8); INSERT INTO adt_ref VALUES (3), (5); SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_col | distributed | a | 4 adt_ref | distributed | a | 4 adt_table | distributed | a | 4 (3 rows) SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1; Colocation Groups --------------------------------------------------------------------- adt_col, adt_ref, adt_table (1 row) SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1; Referencing Table | Definition --------------------------------------------------------------------- adt_col | UNIQUE (a) adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a) (2 rows) SET client_min_messages TO WARNING; SELECT alter_distributed_table('adt_table', shard_count:=6, cascade_to_colocated:=true); alter_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEFAULT; SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_col | distributed | a | 6 adt_ref | distributed | a | 6 adt_table | distributed | a | 6 (3 rows) SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1; Colocation Groups --------------------------------------------------------------------- adt_col, adt_ref, adt_table (1 row) SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1; Referencing Table | Definition --------------------------------------------------------------------- adt_col | UNIQUE (a) adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a) (2 rows) SELECT alter_distributed_table('adt_table', distribution_column:='b', colocate_with:='none'); NOTICE: creating a new table for mx_alter_distributed_table.adt_table NOTICE: moving the data of mx_alter_distributed_table.adt_table NOTICE: dropping the old mx_alter_distributed_table.adt_table NOTICE: renaming the new table to mx_alter_distributed_table.adt_table alter_distributed_table --------------------------------------------------------------------- (1 row) SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_col | distributed | a | 6 adt_ref | distributed | a | 6 adt_table | distributed | b | 6 (3 rows) SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1; Colocation Groups --------------------------------------------------------------------- adt_col, adt_ref adt_table (2 rows) SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1; Referencing Table | Definition --------------------------------------------------------------------- adt_col | UNIQUE (a) adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a) (2 rows) SELECT * FROM adt_table ORDER BY 1; a | b --------------------------------------------------------------------- 1 | 2 3 | 4 5 | 6 (3 rows) SELECT * FROM adt_col ORDER BY 1; a | b --------------------------------------------------------------------- 3 | 4 5 | 6 7 | 8 (3 rows) SELECT * FROM adt_ref ORDER BY 1; a --------------------------------------------------------------------- 3 5 (2 rows) BEGIN; INSERT INTO adt_table SELECT x, x+1 FROM generate_series(1, 1000) x; SELECT alter_distributed_table('adt_table', distribution_column:='a'); NOTICE: creating a new table for mx_alter_distributed_table.adt_table NOTICE: moving the data of mx_alter_distributed_table.adt_table NOTICE: dropping the old mx_alter_distributed_table.adt_table NOTICE: renaming the new table to mx_alter_distributed_table.adt_table alter_distributed_table --------------------------------------------------------------------- (1 row) SELECT COUNT(*) FROM adt_table; count --------------------------------------------------------------------- 1003 (1 row) END; SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text = 'adt_table'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_table | distributed | a | 6 (1 row) -- test procedure colocation is preserved with alter_distributed_table CREATE TABLE test_proc_colocation_0 (a float8); SELECT create_distributed_table('test_proc_colocation_0', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE OR REPLACE procedure proc_0(dist_key float8) LANGUAGE plpgsql AS $$ DECLARE res INT := 0; BEGIN INSERT INTO mx_alter_distributed_table.test_proc_colocation_0 VALUES (dist_key); SELECT count(*) INTO res FROM mx_alter_distributed_table.test_proc_colocation_0; RAISE NOTICE 'Res: %', res; COMMIT; END;$$; SELECT create_distributed_function('proc_0(float8)', 'dist_key', 'test_proc_colocation_0' ); create_distributed_function --------------------------------------------------------------------- (1 row) SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); logicalrelid | colocationid --------------------------------------------------------------------- test_proc_colocation_0 | 1410002 (1 row) SELECT proname, colocationid FROM pg_proc JOIN pg_catalog.pg_dist_object ON pg_proc.oid = pg_catalog.pg_dist_object.objid WHERE proname IN ('proc_0'); proname | colocationid --------------------------------------------------------------------- proc_0 | 1410002 (1 row) SET client_min_messages TO DEBUG1; CALL proc_0(1.0); DEBUG: pushing down the procedure NOTICE: Res: 1 DETAIL: from localhost:xxxxx RESET client_min_messages; -- shardCount is not null && list_length(colocatedTableList) = 1 SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8); NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_0 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_0 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 alter_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG1; CALL proc_0(1.0); DEBUG: pushing down the procedure NOTICE: Res: 2 DETAIL: from localhost:xxxxx RESET client_min_messages; SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); logicalrelid | colocationid --------------------------------------------------------------------- test_proc_colocation_0 | 1410003 (1 row) SELECT proname, colocationid FROM pg_proc JOIN pg_catalog.pg_dist_object ON pg_proc.oid = pg_catalog.pg_dist_object.objid WHERE proname IN ('proc_0'); proname | colocationid --------------------------------------------------------------------- proc_0 | 1410003 (1 row) -- colocatewith is not null && list_length(colocatedTableList) = 1 SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4); NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_0 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_0 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 alter_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE test_proc_colocation_1 (a float8); SELECT create_distributed_table('test_proc_colocation_1', 'a', colocate_with := 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_1'); NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_0 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_0 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 alter_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG1; CALL proc_0(1.0); DEBUG: pushing down the procedure NOTICE: Res: 3 DETAIL: from localhost:xxxxx RESET client_min_messages; SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); logicalrelid | colocationid --------------------------------------------------------------------- test_proc_colocation_0 | 1410005 (1 row) SELECT proname, colocationid FROM pg_proc JOIN pg_catalog.pg_dist_object ON pg_proc.oid = pg_catalog.pg_dist_object.objid WHERE proname IN ('proc_0'); proname | colocationid --------------------------------------------------------------------- proc_0 | 1410005 (1 row) -- shardCount is not null && cascade_to_colocated is true SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true); NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_0 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_0 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_1 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_1 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_1 alter_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG1; CALL proc_0(1.0); DEBUG: pushing down the procedure NOTICE: Res: 4 DETAIL: from localhost:xxxxx RESET client_min_messages; SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); logicalrelid | colocationid --------------------------------------------------------------------- test_proc_colocation_0 | 1410006 (1 row) SELECT proname, colocationid FROM pg_proc JOIN pg_catalog.pg_dist_object ON pg_proc.oid = pg_catalog.pg_dist_object.objid WHERE proname IN ('proc_0'); proname | colocationid --------------------------------------------------------------------- proc_0 | 1410006 (1 row) -- colocatewith is not null && cascade_to_colocated is true SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4, cascade_to_colocated := true); NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_0 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_0 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_1 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_1 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_1 alter_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE test_proc_colocation_2 (a float8); SELECT create_distributed_table('test_proc_colocation_2', 'a', colocate_with := 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_2', cascade_to_colocated := true); NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_0 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_0 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_1 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_1 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_1 alter_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG1; CALL proc_0(1.0); DEBUG: pushing down the procedure NOTICE: Res: 5 DETAIL: from localhost:xxxxx RESET client_min_messages; SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); logicalrelid | colocationid --------------------------------------------------------------------- test_proc_colocation_0 | 1410008 (1 row) SELECT proname, colocationid FROM pg_proc JOIN pg_catalog.pg_dist_object ON pg_proc.oid = pg_catalog.pg_dist_object.objid WHERE proname IN ('proc_0'); proname | colocationid --------------------------------------------------------------------- proc_0 | 1410008 (1 row) -- try a case with more than one procedure CREATE OR REPLACE procedure proc_1(dist_key float8) LANGUAGE plpgsql AS $$ DECLARE res INT := 0; BEGIN INSERT INTO mx_alter_distributed_table.test_proc_colocation_0 VALUES (dist_key); SELECT count(*) INTO res FROM mx_alter_distributed_table.test_proc_colocation_0; RAISE NOTICE 'Res: %', res; COMMIT; END;$$; SELECT create_distributed_function('proc_1(float8)', 'dist_key', 'test_proc_colocation_0' ); create_distributed_function --------------------------------------------------------------------- (1 row) SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); logicalrelid | colocationid --------------------------------------------------------------------- test_proc_colocation_0 | 1410008 (1 row) SELECT proname, colocationid FROM pg_proc JOIN pg_catalog.pg_dist_object ON pg_proc.oid = pg_catalog.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname; proname | colocationid --------------------------------------------------------------------- proc_0 | 1410008 proc_1 | 1410008 (2 rows) SET client_min_messages TO DEBUG1; CALL proc_0(1.0); DEBUG: pushing down the procedure NOTICE: Res: 6 DETAIL: from localhost:xxxxx CALL proc_1(2.0); DEBUG: pushing down the procedure NOTICE: Res: 7 DETAIL: from localhost:xxxxx RESET client_min_messages; SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true); NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_0 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_0 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_2 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_2 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_2 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_2 NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_1 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_1 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_1 alter_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG1; CALL proc_0(1.0); DEBUG: pushing down the procedure NOTICE: Res: 8 DETAIL: from localhost:xxxxx CALL proc_1(2.0); DEBUG: pushing down the procedure NOTICE: Res: 9 DETAIL: from localhost:xxxxx RESET client_min_messages; SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); logicalrelid | colocationid --------------------------------------------------------------------- test_proc_colocation_0 | 1410009 (1 row) SELECT proname, colocationid FROM pg_proc JOIN pg_catalog.pg_dist_object ON pg_proc.oid = pg_catalog.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname; proname | colocationid --------------------------------------------------------------------- proc_0 | 1410009 proc_1 | 1410009 (2 rows) -- case which shouldn't preserve colocation for now -- shardCount is not null && cascade_to_colocated is false SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 18, cascade_to_colocated := false); NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 NOTICE: moving the data of mx_alter_distributed_table.test_proc_colocation_0 NOTICE: dropping the old mx_alter_distributed_table.test_proc_colocation_0 NOTICE: renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 alter_distributed_table --------------------------------------------------------------------- (1 row) SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); logicalrelid | colocationid --------------------------------------------------------------------- test_proc_colocation_0 | 1410010 (1 row) SELECT proname, colocationid FROM pg_proc JOIN pg_catalog.pg_dist_object ON pg_proc.oid = pg_catalog.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname; proname | colocationid --------------------------------------------------------------------- proc_0 | 1410009 proc_1 | 1410009 (2 rows) SET client_min_messages TO WARNING; DROP SCHEMA mx_alter_distributed_table CASCADE;