-- Test queries on a distributed table with shards on the coordinator CREATE SCHEMA coordinator_shouldhaveshards; SET search_path TO coordinator_shouldhaveshards; SET citus.next_shard_id TO 1503000; -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); ?column? --------------------------------------------------------------------- 1 (1 row) RESET client_min_messages; SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); ?column? --------------------------------------------------------------------- 1 (1 row) SET citus.shard_replication_factor TO 1; CREATE TABLE test (x int, y int); SELECT create_distributed_table('test','x', colocate_with := 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT count(*) FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) WHERE logicalrelid = 'test'::regclass AND groupid = 0; count --------------------------------------------------------------------- 2 (1 row) --- enable logging to see which tasks are executed locally SET client_min_messages TO LOG; SET citus.log_local_commands TO ON; -- INSERT..SELECT with COPY under the covers INSERT INTO test SELECT s,s FROM generate_series(2,100) s; NOTICE: executing the copy locally for shard xxxxx NOTICE: executing the copy locally for shard xxxxx -- router queries execute locally INSERT INTO test VALUES (1, 1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) -- multi-shard queries connect to localhost SELECT count(*) FROM test; count --------------------------------------------------------------------- 100 (1 row) WITH a AS (SELECT * FROM test) SELECT count(*) FROM test; count --------------------------------------------------------------------- 100 (1 row) -- multi-shard queries in transaction blocks execute locally BEGIN; SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 100 (1 row) END; BEGIN; SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 100 (1 row) END; -- INSERT..SELECT with re-partitioning after local execution BEGIN; INSERT INTO test VALUES (0,1000); CREATE TABLE repart_test (x int primary key, y int); SELECT create_distributed_table('repart_test','x', colocate_with := 'none'); NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.repart_test (x integer NOT NULL, y integer) ');SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test OWNER TO postgres');SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test ADD CONSTRAINT repart_test_pkey PRIMARY KEY (x)') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.repart_test (x integer NOT NULL, y integer) ');SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test OWNER TO postgres');SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test ADD CONSTRAINT repart_test_pkey PRIMARY KEY (x)') create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO repart_test (x, y) SELECT y, x FROM test; NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503000_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503000_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503000 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503003_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503003_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503003 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503004 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503007 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) SELECT y FROM repart_test WHERE x = 1000; y --------------------------------------------------------------------- 0 (1 row) INSERT INTO repart_test (x, y) SELECT y, x FROM test ON CONFLICT (x) DO UPDATE SET y = -1; NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503000_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503000_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503000 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503003_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503003_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503003 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503004 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = '-1'::integer NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503007 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = '-1'::integer SELECT y FROM repart_test WHERE x = 1000; y --------------------------------------------------------------------- -1 (1 row) ROLLBACK; -- INSERT..SELECT with re-partitioning in EXPLAIN ANALYZE after local execution BEGIN; INSERT INTO test VALUES (0,1000); EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) INSERT INTO test (x, y) SELECT y, x FROM test; ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning ROLLBACK; -- DDL connects to locahost ALTER TABLE test ADD COLUMN z int; -- DDL after local execution BEGIN; SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) ALTER TABLE test DROP COLUMN z; NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503000, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503003, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;') ROLLBACK; BEGIN; ALTER TABLE test DROP COLUMN z; NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503000, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503003, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;') SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) END; SET citus.shard_count TO 6; SET citus.log_remote_commands TO OFF; BEGIN; SET citus.log_local_commands TO ON; CREATE TABLE dist_table (a int); INSERT INTO dist_table SELECT * FROM generate_series(1, 100); -- trigger local execution SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) -- this should be run locally SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503008, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503008, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503011, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503011, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') NOTICE: executing the copy locally for shard xxxxx NOTICE: Copying data from local table... NOTICE: executing the copy locally for shard xxxxx NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$coordinator_shouldhaveshards.dist_table$$) create_distributed_table --------------------------------------------------------------------- (1 row) SELECT count(*) FROM dist_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503008 dist_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503011 dist_table WHERE true count --------------------------------------------------------------------- 100 (1 row) ROLLBACK; CREATE TABLE dist_table (a int); INSERT INTO dist_table SELECT * FROM generate_series(1, 100); BEGIN; SET citus.log_local_commands TO ON; -- trigger local execution SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) -- this should be run locally SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503014, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503014, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503017, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503017, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') NOTICE: executing the copy locally for shard xxxxx NOTICE: Copying data from local table... NOTICE: executing the copy locally for shard xxxxx NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$coordinator_shouldhaveshards.dist_table$$) create_distributed_table --------------------------------------------------------------------- (1 row) SELECT count(*) FROM dist_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503014 dist_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503017 dist_table WHERE true count --------------------------------------------------------------------- 100 (1 row) ROLLBACK; -- repartition queries should work fine SET citus.enable_repartition_joins TO ON; SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; count --------------------------------------------------------------------- 100 (1 row) BEGIN; SET citus.enable_repartition_joins TO ON; SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; count --------------------------------------------------------------------- 100 (1 row) END; BEGIN; SET citus.enable_repartition_joins TO ON; -- trigger local execution SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; ERROR: cannot execute command because a local execution has accessed a placement in the transaction DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; CREATE TABLE ref (a int, b int); SELECT create_reference_table('ref'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE TABLE local (x int, y int); BEGIN; SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 100 (1 row) SELECT * FROM ref JOIN local ON (a = x); NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- (0 rows) TRUNCATE ref; NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE ROLLBACK; BEGIN; SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 100 (1 row) TRUNCATE ref; NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE SELECT * FROM ref JOIN local ON (a = x); NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- (0 rows) ROLLBACK; BEGIN; SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 100 (1 row) INSERT INTO ref VALUES (1,2); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (1, 2) INSERT INTO local VALUES (1,2); SELECT * FROM ref JOIN local ON (a = x); NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- 1 | 2 | 1 | 2 (1 row) ROLLBACK; set citus.enable_cte_inlining to off; BEGIN; SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 100 (1 row) -- we wont see the modifying cte in this query because we will use local execution and -- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- 100 | 3 | 2 | 3 | 2 | 0 (1 row) TRUNCATE ref; NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE SELECT * FROM ref JOIN local ON (a = x); NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- (0 rows) -- we wont see the modifying cte in this query because we will use local execution and -- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- 100 | 3 | 2 | 3 | 2 | 0 (1 row) ROLLBACK; BEGIN; -- we wont see the modifying cte in this query because we will use local execution and -- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- 100 | 3 | 2 | 3 | 2 | 0 (1 row) ROLLBACK; BEGIN; -- we wont see the modifying cte in this query because we will use local execution and -- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503020'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- 100 | 3 | 2 | 1 | 1 | 0 100 | 3 | 2 | 2 | 2 | 0 100 | 3 | 2 | 3 | 3 | 0 100 | 3 | 2 | 4 | 4 | 0 100 | 3 | 2 | 5 | 5 | 0 100 | 3 | 2 | 6 | 6 | 0 100 | 3 | 2 | 7 | 7 | 0 100 | 3 | 2 | 8 | 8 | 0 100 | 3 | 2 | 9 | 9 | 0 100 | 3 | 2 | 10 | 10 | 0 (10 rows) ROLLBACK; -- same local table reference table tests, but outside a transaction block INSERT INTO ref VALUES (1,2); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (1, 2) INSERT INTO local VALUES (1,2); SELECT * FROM ref JOIN local ON (a = x); NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- 1 | 2 | 1 | 2 (1 row) -- we wont see the modifying cte in this query because we will use local execution and -- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- 100 | 3 | 2 | 3 | 2 | 1 (1 row) -- joins between local tables and distributed tables are disallowed CREATE TABLE dist_table(a int); ERROR: relation "dist_table" already exists SELECT create_distributed_table('dist_table', 'a'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$coordinator_shouldhaveshards.dist_table$$) create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO dist_table VALUES(1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.dist_table_1503021 (a) VALUES (1) SELECT * FROM local JOIN dist_table ON (a = x) ORDER BY 1,2,3; x | y | a --------------------------------------------------------------------- 1 | 2 | 1 1 | 2 | 1 3 | 2 | 3 (3 rows) SELECT * FROM local JOIN dist_table ON (a = x) WHERE a = 1 ORDER BY 1,2,3; NOTICE: executing the command locally: SELECT local.x, local.y, dist_table.a FROM ((SELECT local_1.x, local_1.y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) local_1) local JOIN coordinator_shouldhaveshards.dist_table_1503021 dist_table ON ((dist_table.a OPERATOR(pg_catalog.=) local.x))) WHERE (dist_table.a OPERATOR(pg_catalog.=) 1) ORDER BY local.x, local.y, dist_table.a x | y | a --------------------------------------------------------------------- 1 | 2 | 1 1 | 2 | 1 (2 rows) -- intermediate results are allowed WITH cte_1 AS (SELECT * FROM dist_table ORDER BY 1 LIMIT 1) SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a); NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503021 dist_table WHERE true ORDER BY a LIMIT '1'::bigint NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503024 dist_table WHERE true ORDER BY a LIMIT '1'::bigint NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, cte_1.a FROM ((coordinator_shouldhaveshards.ref_1503020 ref JOIN (SELECT local_1.x, local_1.y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a))) a | b | x | y | a --------------------------------------------------------------------- 1 | 2 | 1 | 2 | 1 (1 row) -- full router query with CTE and local WITH cte_1 AS (SELECT * FROM ref LIMIT 1) SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a); NOTICE: executing the command locally: WITH cte_1 AS (SELECT ref_1.a, ref_1.b FROM coordinator_shouldhaveshards.ref_1503020 ref_1 LIMIT 1) SELECT ref.a, ref.b, local.x, local.y, cte_1.a, cte_1.b FROM ((coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a))) a | b | x | y | a | b --------------------------------------------------------------------- 1 | 2 | 1 | 2 | 1 | 2 (1 row) DROP TABLE dist_table; -- issue #3801 SET citus.shard_replication_factor TO 2; CREATE TABLE dist_table(a int); SELECT create_distributed_table('dist_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) BEGIN; -- this will use perPlacementQueryStrings, make sure it works correctly with -- copying task INSERT INTO dist_table SELECT a + 1 FROM dist_table; NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503027_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503027_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503027 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503029_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503029_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503029 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503030_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503030_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503030 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503032_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503032_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503032 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0 ROLLBACK; SET citus.shard_replication_factor TO 1; BEGIN; SET citus.shard_replication_factor TO 2; CREATE TABLE dist_table1(a int); -- this will use queryStringList, make sure it works correctly with -- copying task SELECT create_distributed_table('dist_table1', 'a'); NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503033, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503033, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503035, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503035, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503036, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503036, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503038, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503038, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') create_distributed_table --------------------------------------------------------------------- (1 row) ROLLBACK; RESET citus.enable_cte_inlining; CREATE table ref_table(x int PRIMARY KEY, y int); -- this will be replicated to the coordinator because of add_coordinator test SELECT create_reference_table('ref_table'); create_reference_table --------------------------------------------------------------------- (1 row) TRUNCATE TABLE test; BEGIN; INSERT INTO test SELECT *, * FROM generate_series(1, 100); NOTICE: executing the copy locally for shard xxxxx NOTICE: executing the copy locally for shard xxxxx INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100); NOTICE: executing the copy locally for shard xxxxx SELECT COUNT(*) FROM test JOIN ref_table USING(x); NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.test_1503000 test JOIN coordinator_shouldhaveshards.ref_table_1503039 ref_table ON ((test.x OPERATOR(pg_catalog.=) ref_table.x))) WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.test_1503003 test JOIN coordinator_shouldhaveshards.ref_table_1503039 ref_table ON ((test.x OPERATOR(pg_catalog.=) ref_table.x))) WHERE true count --------------------------------------------------------------------- 100 (1 row) ROLLBACK; -- writing to local file and remote intermediate files -- at the same time INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100); NOTICE: executing the copy locally for shard xxxxx WITH cte_1 AS ( INSERT INTO ref_table SELECT * FROM ref_table LIMIT 10000 ON CONFLICT (x) DO UPDATE SET y = EXCLUDED.y + 1 RETURNING *) SELECT count(*) FROM cte_1; NOTICE: executing the command locally: SELECT x, y FROM coordinator_shouldhaveshards.ref_table_1503039 ref_table LIMIT 10000 NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_table_1503039 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_result('insert_select_XXX_1503039'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = (excluded.y OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.x, citus_table_alias.y NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) cte_1 count --------------------------------------------------------------------- 100 (1 row) -- issue #4237: preventing empty placement creation on coordinator CREATE TABLE test_append_table(a int); SELECT create_distributed_table('test_append_table', 'a', 'append'); create_distributed_table --------------------------------------------------------------------- (1 row) -- this will fail since it will try to create an empty placement in the -- coordinator as well SET citus.shard_replication_factor TO 3; SELECT master_create_empty_shard('test_append_table'); NOTICE: Creating placements for the append partitioned tables on the coordinator is not supported, skipping coordinator ... ERROR: could only create 2 of 3 of required shard replicas -- this will create an empty shard with replicas in the two worker nodes SET citus.shard_replication_factor TO 2; SELECT 1 FROM master_create_empty_shard('test_append_table'); NOTICE: Creating placements for the append partitioned tables on the coordinator is not supported, skipping coordinator ... ?column? --------------------------------------------------------------------- 1 (1 row) -- verify groupid is not 0 for each placement SELECT COUNT(*) FROM pg_dist_placement p, pg_dist_shard s WHERE p.shardid = s.shardid AND s.logicalrelid = 'test_append_table'::regclass AND p.groupid > 0; count --------------------------------------------------------------------- 2 (1 row) SET citus.shard_replication_factor TO 1; -- test partitioned index creation with long name CREATE TABLE test_index_creation1 ( tenant_id integer NOT NULL, timeperiod timestamp without time zone NOT NULL, field1 integer NOT NULL, inserted_utc timestamp without time zone NOT NULL DEFAULT now(), PRIMARY KEY(tenant_id, timeperiod) ) PARTITION BY RANGE (timeperiod); CREATE TABLE test_index_creation1_p2020_09_26 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00'); CREATE TABLE test_index_creation1_p2020_09_27 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00'); select create_distributed_table('test_index_creation1', 'tenant_id'); create_distributed_table --------------------------------------------------------------------- (1 row) -- should be able to create indexes with INCLUDE/WHERE CREATE INDEX ix_test_index_creation5 ON test_index_creation1 USING btree(tenant_id, timeperiod) INCLUDE (field1) WHERE (tenant_id = 100); NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503042 ON coordinator_shouldhaveshards.test_index_creation1_1503042 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 )WHERE (tenant_id = 100) NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503045 ON coordinator_shouldhaveshards.test_index_creation1_1503045 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 )WHERE (tenant_id = 100) -- test if indexes are created SELECT 1 AS created WHERE EXISTS(SELECT * FROM pg_indexes WHERE indexname LIKE '%test_index_creation%'); created --------------------------------------------------------------------- 1 (1 row) -- test alter_distributed_table UDF SET citus.shard_count TO 4; CREATE TABLE adt_table (a INT, b INT); CREATE TABLE adt_col (a INT UNIQUE, b INT); CREATE TABLE adt_ref (a INT REFERENCES adt_col(a)); SELECT create_distributed_table('adt_table', 'a', colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('adt_col', 'a', colocate_with:='adt_table'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('adt_ref', 'a', colocate_with:='adt_table'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO adt_table VALUES (1, 2), (3, 4), (5, 6); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.adt_table_1503060 AS citus_table_alias (a, b) VALUES (1,2), (5,6) INSERT INTO adt_col VALUES (3, 4), (5, 6), (7, 8); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.adt_col_1503064 AS citus_table_alias (a, b) VALUES (5,6) INSERT INTO adt_ref VALUES (3), (5); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.adt_ref_1503068 AS citus_table_alias (a) VALUES (5) SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_col | distributed | a | 4 adt_ref | distributed | a | 4 adt_table | distributed | a | 4 (3 rows) SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1; Colocation Groups --------------------------------------------------------------------- adt_col, adt_ref, adt_table (1 row) SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1; Referencing Table | Definition --------------------------------------------------------------------- adt_col | UNIQUE (a) adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a) (2 rows) SET client_min_messages TO WARNING; SELECT alter_distributed_table('adt_table', shard_count:=6, cascade_to_colocated:=true); alter_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEFAULT; SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_col | distributed | a | 6 adt_ref | distributed | a | 6 adt_table | distributed | a | 6 (3 rows) SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1; Colocation Groups --------------------------------------------------------------------- adt_col, adt_ref, adt_table (1 row) SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1; Referencing Table | Definition --------------------------------------------------------------------- adt_col | UNIQUE (a) adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a) (2 rows) SELECT alter_distributed_table('adt_table', distribution_column:='b', colocate_with:='none'); NOTICE: creating a new table for coordinator_shouldhaveshards.adt_table NOTICE: moving the data of coordinator_shouldhaveshards.adt_table NOTICE: dropping the old coordinator_shouldhaveshards.adt_table NOTICE: renaming the new table to coordinator_shouldhaveshards.adt_table alter_distributed_table --------------------------------------------------------------------- (1 row) SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_col | distributed | a | 6 adt_ref | distributed | a | 6 adt_table | distributed | b | 6 (3 rows) SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1; Colocation Groups --------------------------------------------------------------------- adt_col, adt_ref adt_table (2 rows) SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1; Referencing Table | Definition --------------------------------------------------------------------- adt_col | UNIQUE (a) adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a) (2 rows) SELECT * FROM adt_table ORDER BY 1; a | b --------------------------------------------------------------------- 1 | 2 3 | 4 5 | 6 (3 rows) SELECT * FROM adt_col ORDER BY 1; a | b --------------------------------------------------------------------- 3 | 4 5 | 6 7 | 8 (3 rows) SELECT * FROM adt_ref ORDER BY 1; a --------------------------------------------------------------------- 3 5 (2 rows) SET client_min_messages TO WARNING; BEGIN; INSERT INTO adt_table SELECT x, x+1 FROM generate_series(1, 1000) x; SELECT alter_distributed_table('adt_table', distribution_column:='a'); alter_distributed_table --------------------------------------------------------------------- (1 row) SELECT COUNT(*) FROM adt_table; count --------------------------------------------------------------------- 1003 (1 row) END; SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text = 'adt_table'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_table | distributed | a | 6 (1 row) SET client_min_messages TO DEFAULT; -- issue 4508 table_1 and table_2 are used to test -- some edge cases around intermediate result pruning CREATE TABLE table_1 (key int, value text); SELECT create_distributed_table('table_1', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table_2 (key int, value text); SELECT create_distributed_table('table_2', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503102 AS citus_table_alias (key, value) VALUES (1,'1'::text) NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503105 AS citus_table_alias (key, value) VALUES (2,'2'::text) INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503106 AS citus_table_alias (key, value) VALUES (1,'1'::text), (5,'5'::text) NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503109 AS citus_table_alias (key, value) VALUES (2,'2'::text) SET citus.log_intermediate_results TO ON; SET client_min_messages to debug1; WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) >= (SELECT value FROM a)); DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 count | key --------------------------------------------------------------------- 1 | 1 (1 row) WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) INSERT INTO table_1 SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) >= (SELECT value FROM a)); DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery NOTICE: executing the copy locally for shard xxxxx WITH stats AS ( SELECT count(key) m FROM table_1 ), inserts AS ( INSERT INTO table_2 SELECT key, count(*) FROM table_1 WHERE key >= (SELECT m FROM stats) GROUP BY key HAVING count(*) <= (SELECT m FROM stats) LIMIT 1 RETURNING * ) SELECT count(*) FROM inserts; DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1 DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: push down of limit count: 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file DEBUG: Collecting INSERT ... SELECT results on coordinator NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts count --------------------------------------------------------------------- 0 (1 row) -- a helper function which return true if the coordinated -- trannsaction uses 2PC CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', $$coordinated_transaction_should_use_2PC$$; -- a local SELECT followed by remote SELECTs -- does not trigger 2PC BEGIN; SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- (0 rows) WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 0 (1 row) WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 0 (1 row) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- f (1 row) COMMIT; -- remote SELECTs followed by local SELECTs -- does not trigger 2PC BEGIN; SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 0 (1 row) WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 0 (1 row) SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- (0 rows) WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 0 (1 row) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- f (1 row) COMMIT; -- a local SELECT followed by a remote Modify -- triggers 2PC BEGIN; SELECT y FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- (0 rows) UPDATE test SET y = y +1; NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) COMMIT; -- a local modify followed by a remote SELECT -- triggers 2PC BEGIN; INSERT INTO test VALUES (1,1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) SELECT count(*) FROM test; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 1 (1 row) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) COMMIT; -- a local modify followed by a remote MODIFY -- triggers 2PC BEGIN; INSERT INTO test VALUES (1,1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) UPDATE test SET y = y +1; NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) COMMIT; -- a local modify followed by a remote single shard MODIFY -- triggers 2PC BEGIN; INSERT INTO test VALUES (1,1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) INSERT INTO test VALUES (3,3); SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) COMMIT; -- a remote single shard modify followed by a local single -- shard MODIFY triggers 2PC BEGIN; INSERT INTO test VALUES (3,3); INSERT INTO test VALUES (1,1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) COMMIT; -- a remote single shard select followed by a local single -- shard MODIFY triggers 2PC. But, the transaction manager -- is smart enough to skip sending 2PC as the remote -- command is read only BEGIN; SELECT count(*) FROM test WHERE x = 3; count --------------------------------------------------------------------- 2 (1 row) INSERT INTO test VALUES (1,1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) SET LOCAL citus.log_remote_commands TO ON; COMMIT; NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -- a local single shard select followed by a remote single -- shard modify does not trigger 2PC BEGIN; SELECT count(*) FROM test WHERE x = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 5 (1 row) INSERT INTO test VALUES (3,3); SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- f (1 row) SET LOCAL citus.log_remote_commands TO ON; COMMIT; NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx RESET client_min_messages; \set VERBOSITY terse DROP TABLE ref_table; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_table_xxxxx CASCADE DELETE FROM test; DROP TABLE test; DROP TABLE dist_table; DROP TABLE ref; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE DROP TABLE test_append_table; DROP SCHEMA coordinator_shouldhaveshards CASCADE; NOTICE: drop cascades to 20 other objects SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- 1 (1 row)