-- -- MULTI_MX_INSERT_SELECT_REPARTITION -- -- Test behaviour of repartitioned INSERT ... SELECT in MX setup -- -- This test file has an alternative output because of the change in the -- display of SQL-standard function's arguments in INSERT/SELECT in PG15. -- The alternative output can be deleted when we drop support for PG14 -- SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15; server_version_ge_15 --------------------------------------------------------------------- f (1 row) CREATE SCHEMA multi_mx_insert_select_repartition; SET search_path TO multi_mx_insert_select_repartition; SET citus.next_shard_id TO 4213581; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 4; CREATE TABLE source_table(a int, b int); SELECT create_distributed_table('source_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table SELECT floor(i/4), i*i FROM generate_series(1, 20) i; SET citus.shard_count TO 3; CREATE TABLE target_table(a int, b int); SELECT create_distributed_table('target_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE FUNCTION square(int) RETURNS INT AS $$ SELECT $1 * $1 $$ LANGUAGE SQL; select create_distributed_function('square(int)'); NOTICE: procedure multi_mx_insert_select_repartition.square is already distributed DETAIL: Citus distributes procedures with CREATE [PROCEDURE|FUNCTION|AGGREGATE] commands create_distributed_function --------------------------------------------------------------------- (1 row) select public.colocate_proc_with_table('square', 'source_table'::regclass, 0); colocate_proc_with_table --------------------------------------------------------------------- (1 row) -- Test along with function delegation -- function delegation only happens for "SELECT f()", and we don't use -- repartitioned INSERT/SELECT when task count is 1, so the following -- should go via coordinator EXPLAIN (costs off) INSERT INTO target_table(a) SELECT square(4); QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> Result (3 rows) INSERT INTO target_table(a) SELECT square(4); SELECT * FROM target_table; a | b --------------------------------------------------------------------- 16 | (1 row) TRUNCATE target_table; -- -- Test repartitioned INSERT/SELECT from MX worker -- \c - - - :worker_1_port SET search_path TO multi_mx_insert_select_repartition; EXPLAIN (costs off) INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: repartition -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> HashAggregate Group Key: a -> Seq Scan on source_table_4213581 source_table (10 rows) INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; SET citus.log_local_commands to on; -- INSERT .. SELECT via repartitioning with local execution BEGIN; select count(*) from source_table WHERE a = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE (a OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 4 (1 row) insert into target_table SELECT a*2 FROM source_table RETURNING a; NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_4213581_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_4213581_to','SELECT (a OPERATOR(pg_catalog.*) 2) AS a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true',0,'hash','{-2147483648,-715827883,715827882}'::text[],'{-715827884,715827881,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_4213583_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_4213583_to','SELECT (a OPERATOR(pg_catalog.*) 2) AS a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true',0,'hash','{-2147483648,-715827883,715827882}'::text[],'{-715827884,715827881,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartitioned_results_xxxxx_from_4213582_to_0','repartitioned_results_xxxxx_from_4213584_to_0']::text[],'localhost',57638) bytes NOTICE: executing the command locally: INSERT INTO multi_mx_insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_0,repartitioned_results_xxxxx_from_4213582_to_0,repartitioned_results_xxxxx_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RETURNING citus_table_alias.a NOTICE: executing the command locally: INSERT INTO multi_mx_insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RETURNING citus_table_alias.a a --------------------------------------------------------------------- 0 0 0 2 2 2 2 4 4 4 4 6 6 6 6 8 8 8 8 10 (20 rows) ROLLBACK; BEGIN; select count(*) from source_table WHERE a = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE (a OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 4 (1 row) insert into target_table SELECT a FROM source_table LIMIT 10; NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true LIMIT '10'::bigint NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true LIMIT '10'::bigint NOTICE: executing the copy locally for shard xxxxx ROLLBACK; \c - - - :master_port SET search_path TO multi_mx_insert_select_repartition; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- 0 | 9 1 | 49 2 | 121 3 | 225 4 | 361 5 | 400 (6 rows) RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_insert_select_repartition CASCADE; NOTICE: drop cascades to 3 other objects