-- -- INTERMEDIATE_RESULT_PRUNING -- -- This test file has an alternative output because of the change in the -- display of SQL-standard function's arguments in INSERT/SELECT in PG15. -- The alternative output can be deleted when we drop support for PG14 -- SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15; server_version_ge_15 --------------------------------------------------------------------- f (1 row) CREATE SCHEMA intermediate_result_pruning; SET search_path TO intermediate_result_pruning; SET citus.log_intermediate_results TO TRUE; SET citus.shard_count TO 4; SET citus.next_shard_id TO 1480000; SET citus.shard_replication_factor = 1; CREATE TABLE table_1 (key int, value text); SELECT create_distributed_table('table_1', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table_2 (key int, value text); SELECT create_distributed_table('table_2', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table_3 (key int, value text); SELECT create_distributed_table('table_3', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE ref_table (key int, value text); SELECT create_reference_table('ref_table'); create_reference_table --------------------------------------------------------------------- (1 row) -- load some data INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO table_3 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); -- see which workers are hit for intermediate results SET client_min_messages TO DEBUG1; -- a very basic case, where the intermediate result -- should go to both workers WITH some_values_1 AS MATERIALIZED (SELECT key FROM table_1 WHERE value IN ('3', '4')) SELECT count(*) FROM some_values_1 JOIN table_2 USING (key); DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 2 (1 row) -- a very basic case, where the intermediate result -- should only go to one worker because the final query is a router -- we use random() to prevent postgres inline the CTE(s) WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')) SELECT count(*) FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 0 (1 row) -- a similar query, but with a reference table now -- given that reference tables are replicated to all nodes -- we have to broadcast to all nodes WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')) SELECT count(*) FROM some_values_1 JOIN ref_table USING (key); DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.ref_table USING (key)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 2 (1 row) -- a similar query as above, but this time use the CTE inside -- another CTE WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1) SELECT count(*) FROM some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 1; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT key, random() AS random FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 0 (1 row) -- the second CTE does a join with a distributed table -- and the final query is a router query WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) SELECT count(*) FROM some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 3; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 1 (1 row) -- the first CTE is used both within second CTE and the final query -- the second CTE does a join with a distributed table -- and the final query is a router query WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) SELECT count(*) FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 1 (1 row) -- the first CTE is used both within second CTE and the final query -- the second CTE does a join with a distributed table but a router query on a worker -- and the final query is another router query on another worker WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1) SELECT count(*) FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 0 (1 row) -- the first CTE is used both within second CTE and the final query -- the second CTE does a join with a distributed table but a router query on a worker -- and the final query is a router query on the same worker, so the first result is only -- broadcasted to a single node WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1) SELECT count(*) FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 1; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 0 (1 row) -- the same query with the above, but the final query is hitting all shards WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) SELECT count(*) FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.<>) 3) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 1 (1 row) -- even if we add a filter on the first query and make it a router query, -- the first intermediate result still hits all workers because of the final -- join is hitting all workers WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 3) SELECT count(*) FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.<>) 3) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 0 (1 row) -- the reference table is joined with a distributed table and an intermediate -- result, but the distributed table hits all shards, so the intermediate -- result is sent to all nodes WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM ref_table WHERE value IN ('3', '4')) SELECT count(*) FROM (some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key); DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.ref_table WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.ref_table USING (key)) JOIN intermediate_result_pruning.table_2 USING (key)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 2 (1 row) -- similar query as above, but this time the whole query is a router -- query, so no intermediate results WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM ref_table WHERE value IN ('3', '4')) SELECT count(*) FROM (some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key) WHERE table_2.key = 1; count --------------------------------------------------------------------- 0 (1 row) -- now, the second CTE has a single shard join with a distributed table -- so the first CTE should only be broadcasted to that node -- since the final query doesn't have a join, it should simply be broadcasted -- to one node WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) SELECT count(*) FROM some_values_2; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 (1 row) -- the same query inlined inside a CTE, and the final query has a -- join with a distributed table WITH top_cte as MATERIALIZED ( WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) SELECT DISTINCT key FROM some_values_2 ) SELECT count(*) FROM top_cte JOIN table_2 USING (key); DEBUG: generating subplan XXX_1 for CTE top_cte: WITH some_values_1 AS MATERIALIZED (SELECT table_1.key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (table_1.value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text]))), some_values_2 AS MATERIALIZED (SELECT some_values_1.key, random() AS random FROM (some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1)) SELECT DISTINCT key FROM some_values_2 DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT key FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) top_cte JOIN intermediate_result_pruning.table_2 USING (key)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 (1 row) -- very much the same query, but this time the top query is also a router query -- on a single worker, so all intermediate results only hit a single node WITH top_cte as MATERIALIZED ( WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) SELECT DISTINCT key FROM some_values_2 ) SELECT count(*) FROM top_cte JOIN table_2 USING (key) WHERE table_2.key = 2; DEBUG: generating subplan XXX_1 for CTE top_cte: WITH some_values_1 AS MATERIALIZED (SELECT table_1.key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (table_1.value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text]))), some_values_2 AS MATERIALIZED (SELECT some_values_1.key, random() AS random FROM (some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1)) SELECT DISTINCT key FROM some_values_2 DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT key FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) top_cte JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 2) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 (1 row) -- some_values_1 is first used by a single shard-query, and than with a multi-shard -- CTE, finally a cartesian product join WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1), some_values_3 AS MATERIALIZED (SELECT key FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key)) SELECT * FROM some_values_3 JOIN ref_table ON (true); DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_3 for CTE some_values_3: SELECT some_values_2.key FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT some_values_3.key, ref_table.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) some_values_3 JOIN intermediate_result_pruning.ref_table ON (true)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx key | key | value --------------------------------------------------------------------- (0 rows) -- join on intermediate results, so should only -- go to a single node WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM table_2 WHERE value IN ('3', '4')) SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key); DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT key, random() AS random FROM intermediate_result_pruning.table_2 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 2 (1 row) -- same query with WHERE false make sure that we're not broken -- for such edge cases WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM table_2 WHERE value IN ('3', '4')) SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key) WHERE false; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT key, random() AS random FROM intermediate_result_pruning.table_2 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE false DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 (1 row) -- do not use some_values_2 at all, so only 2 intermediate results are -- broadcasted WITH some_values_1 AS MATERIALIZED (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), some_values_2 AS MATERIALIZED (SELECT key, random() FROM some_values_1), some_values_3 AS MATERIALIZED (SELECT key, random() FROM some_values_1) SELECT count(*) FROM some_values_3; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_3: SELECT key, random() AS random FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_3 DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 2 (1 row) -- lets have some deeper intermediate results -- the inner most two results and the final query (which contains only intermediate results) -- hitting single worker, others hitting all workers -- (see below query where all intermediate results hit a single node) SELECT count(*) FROM ( SELECT avg(min::int) FROM ( SELECT min(table_1.value) FROM ( SELECT avg(value::int) as avg_ev_type FROM ( SELECT max(value) as mx_val_1 FROM ( SELECT avg(value::int) as avg FROM ( SELECT cnt FROM ( SELECT count(*) as cnt, value FROM table_1 WHERE key = 1 GROUP BY value ) as level_1, table_1 WHERE table_1.key = level_1.cnt AND key = 3 ) as level_2, table_2 WHERE table_2.key = level_2.cnt AND key = 5 GROUP BY level_2.cnt ) as level_3, table_1 WHERE value::numeric = level_3.avg AND key = 6 GROUP BY level_3.avg ) as level_4, table_2 WHERE level_4.mx_val_1::int = table_2.key GROUP BY level_4.mx_val_1 ) as level_5, table_1 WHERE level_5.avg_ev_type = table_1.key AND key > 111 GROUP BY level_5.avg_ev_type ) as level_6, table_1 WHERE table_1.key::int = level_6.min::int GROUP BY table_1.value ) as bar; DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS cnt, value FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) GROUP BY value DEBUG: generating subplan XXX_2 for subquery SELECT avg((table_2.value)::integer) AS avg FROM (SELECT level_1.cnt FROM (SELECT intermediate_result.cnt, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint, value text)) level_1, intermediate_result_pruning.table_1 WHERE ((table_1.key OPERATOR(pg_catalog.=) level_1.cnt) AND (table_1.key OPERATOR(pg_catalog.=) 3))) level_2, intermediate_result_pruning.table_2 WHERE ((table_2.key OPERATOR(pg_catalog.=) level_2.cnt) AND (table_2.key OPERATOR(pg_catalog.=) 5)) GROUP BY level_2.cnt DEBUG: generating subplan XXX_3 for subquery SELECT max(table_1.value) AS mx_val_1 FROM (SELECT intermediate_result.avg FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) level_3, intermediate_result_pruning.table_1 WHERE (((table_1.value)::numeric OPERATOR(pg_catalog.=) level_3.avg) AND (table_1.key OPERATOR(pg_catalog.=) 6)) GROUP BY level_3.avg DEBUG: generating subplan XXX_4 for subquery SELECT avg((table_2.value)::integer) AS avg_ev_type FROM (SELECT intermediate_result.mx_val_1 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(mx_val_1 text)) level_4, intermediate_result_pruning.table_2 WHERE ((level_4.mx_val_1)::integer OPERATOR(pg_catalog.=) table_2.key) GROUP BY level_4.mx_val_1 DEBUG: generating subplan XXX_5 for subquery SELECT min(table_1.value) AS min FROM (SELECT intermediate_result.avg_ev_type FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(avg_ev_type numeric)) level_5, intermediate_result_pruning.table_1 WHERE ((level_5.avg_ev_type OPERATOR(pg_catalog.=) (table_1.key)::numeric) AND (table_1.key OPERATOR(pg_catalog.>) 111)) GROUP BY level_5.avg_ev_type DEBUG: generating subplan XXX_6 for subquery SELECT avg((level_6.min)::integer) AS avg FROM (SELECT intermediate_result.min FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(min text)) level_6, intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) (level_6.min)::integer) GROUP BY table_1.value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.avg FROM read_intermediate_result('XXX_6'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) bar DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx DEBUG: Subplan XXX_5 will be sent to localhost:xxxxx DEBUG: Subplan XXX_5 will be sent to localhost:xxxxx DEBUG: Subplan XXX_6 will be written to local file count --------------------------------------------------------------------- 0 (1 row) -- the same query where all intermediate results hits one -- worker because each and every query is a router query -- but on different nodes SELECT count(*) FROM ( SELECT avg(min::int) FROM ( SELECT min(table_1.value) FROM ( SELECT avg(value::int) as avg_ev_type FROM ( SELECT max(value) as mx_val_1 FROM ( SELECT avg(value::int) as avg FROM ( SELECT cnt FROM ( SELECT count(*) as cnt, value FROM table_1 WHERE key = 1 GROUP BY value ) as level_1, table_1 WHERE table_1.key = level_1.cnt AND key = 3 ) as level_2, table_2 WHERE table_2.key = level_2.cnt AND key = 5 GROUP BY level_2.cnt ) as level_3, table_1 WHERE value::numeric = level_3.avg AND key = 6 GROUP BY level_3.avg ) as level_4, table_2 WHERE level_4.mx_val_1::int = table_2.key AND table_2.key = 1 GROUP BY level_4.mx_val_1 ) as level_5, table_1 WHERE level_5.avg_ev_type = table_1.key AND key = 111 GROUP BY level_5.avg_ev_type ) as level_6, table_1 WHERE table_1.key::int = level_6.min::int AND table_1.key = 4 GROUP BY table_1.value ) as bar; DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS cnt, value FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) GROUP BY value DEBUG: generating subplan XXX_2 for subquery SELECT avg((table_2.value)::integer) AS avg FROM (SELECT level_1.cnt FROM (SELECT intermediate_result.cnt, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint, value text)) level_1, intermediate_result_pruning.table_1 WHERE ((table_1.key OPERATOR(pg_catalog.=) level_1.cnt) AND (table_1.key OPERATOR(pg_catalog.=) 3))) level_2, intermediate_result_pruning.table_2 WHERE ((table_2.key OPERATOR(pg_catalog.=) level_2.cnt) AND (table_2.key OPERATOR(pg_catalog.=) 5)) GROUP BY level_2.cnt DEBUG: generating subplan XXX_3 for subquery SELECT max(table_1.value) AS mx_val_1 FROM (SELECT intermediate_result.avg FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) level_3, intermediate_result_pruning.table_1 WHERE (((table_1.value)::numeric OPERATOR(pg_catalog.=) level_3.avg) AND (table_1.key OPERATOR(pg_catalog.=) 6)) GROUP BY level_3.avg DEBUG: generating subplan XXX_4 for subquery SELECT avg((table_2.value)::integer) AS avg_ev_type FROM (SELECT intermediate_result.mx_val_1 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(mx_val_1 text)) level_4, intermediate_result_pruning.table_2 WHERE (((level_4.mx_val_1)::integer OPERATOR(pg_catalog.=) table_2.key) AND (table_2.key OPERATOR(pg_catalog.=) 1)) GROUP BY level_4.mx_val_1 DEBUG: generating subplan XXX_5 for subquery SELECT min(table_1.value) AS min FROM (SELECT intermediate_result.avg_ev_type FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(avg_ev_type numeric)) level_5, intermediate_result_pruning.table_1 WHERE ((level_5.avg_ev_type OPERATOR(pg_catalog.=) (table_1.key)::numeric) AND (table_1.key OPERATOR(pg_catalog.=) 111)) GROUP BY level_5.avg_ev_type DEBUG: generating subplan XXX_6 for subquery SELECT avg((level_6.min)::integer) AS avg FROM (SELECT intermediate_result.min FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(min text)) level_6, intermediate_result_pruning.table_1 WHERE ((table_1.key OPERATOR(pg_catalog.=) (level_6.min)::integer) AND (table_1.key OPERATOR(pg_catalog.=) 4)) GROUP BY table_1.value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.avg FROM read_intermediate_result('XXX_6'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) bar DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx DEBUG: Subplan XXX_5 will be sent to localhost:xxxxx DEBUG: Subplan XXX_6 will be written to local file count --------------------------------------------------------------------- 0 (1 row) -- sanity checks for set operations -- the intermediate results should just hit a single worker (SELECT key FROM table_1 WHERE key = 1) INTERSECT (SELECT key FROM table_1 WHERE key = 2); DEBUG: generating subplan XXX_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file key --------------------------------------------------------------------- (0 rows) -- the intermediate results should just hit a single worker WITH cte_1 AS MATERIALIZED ( (SELECT key FROM table_1 WHERE key = 1) INTERSECT (SELECT key FROM table_1 WHERE key = 2) ), cte_2 AS MATERIALIZED ( (SELECT key FROM table_1 WHERE key = 3) INTERSECT (SELECT key FROM table_1 WHERE key = 4) ) SELECT * FROM cte_1 UNION SELECT * FROM cte_2; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) DEBUG: generating subplan XXX_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 3) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 4) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_2 will be written to local file key --------------------------------------------------------------------- (0 rows) -- one final test with SET operations, where -- we join the results with distributed tables -- so cte_1 should hit all workers, but still the -- others should hit single worker each WITH cte_1 AS MATERIALIZED ( (SELECT key FROM table_1 WHERE key = 1) INTERSECT (SELECT key FROM table_1 WHERE key = 2) ), cte_2 AS MATERIALIZED ( SELECT count(*) FROM table_1 JOIN cte_1 USING (key) ) SELECT * FROM cte_2; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) DEBUG: generating subplan XXX_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT count(*) AS count FROM (intermediate_result_pruning.table_1 JOIN (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 USING (key)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) cte_2 DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 (1 row) -- sanity checks for non-colocated subquery joins -- the recursively planned subquery (bar) should hit all -- nodes SELECT count(*) FROM (SELECT key, random() FROM table_1) as foo, (SELECT key, random() FROM table_2) as bar WHERE foo.key != bar.key; DEBUG: generating subplan XXX_1 for subquery SELECT key, random() AS random FROM intermediate_result_pruning.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT table_1.key, random() AS random FROM intermediate_result_pruning.table_1) foo, (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 14 (1 row) -- the recursively planned subquery (bar) should hit one -- node because foo goes to a single node SELECT count(*) FROM (SELECT key, random() FROM table_1 WHERE key = 1) as foo, (SELECT key, random() FROM table_2) as bar WHERE foo.key != bar.key; DEBUG: generating subplan XXX_1 for subquery SELECT key, random() AS random FROM intermediate_result_pruning.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT table_1.key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1)) foo, (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 4 (1 row) -- sanity checks for modification queries -- select_data goes to a single node, because it is used in another subquery -- raw_data is also the final router query, so hits a single shard -- however, the subquery in WHERE clause of the DELETE query is broadcasted to all -- nodes BEGIN; WITH select_data AS MATERIALIZED ( SELECT * FROM table_1 ), raw_data AS MATERIALIZED ( DELETE FROM table_2 WHERE key >= (SELECT min(key) FROM select_data WHERE key > 1) RETURNING * ) SELECT * FROM raw_data; DEBUG: generating subplan XXX_1 for CTE select_data: SELECT key, value FROM intermediate_result_pruning.table_1 DEBUG: generating subplan XXX_2 for CTE raw_data: DELETE FROM intermediate_result_pruning.table_2 WHERE (key OPERATOR(pg_catalog.>=) (SELECT min(select_data.key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE (select_data.key OPERATOR(pg_catalog.>) 1))) RETURNING key, value DEBUG: generating subplan XXX_1 for subquery SELECT min(key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE (key OPERATOR(pg_catalog.>) 1) DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM intermediate_result_pruning.table_2 WHERE (key OPERATOR(pg_catalog.>=) (SELECT intermediate_result.min FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min integer))) RETURNING key, value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) raw_data DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx key | value --------------------------------------------------------------------- 3 | 3 4 | 4 5 | 5 6 | 6 (4 rows) ROLLBACK; -- select_data goes to a single node, because it is used in another subquery -- raw_data is also the final router query, so hits a single shard -- however, the subquery in WHERE clause of the DELETE query is broadcasted to all -- nodes BEGIN; WITH select_data AS MATERIALIZED ( SELECT * FROM table_1 ), raw_data AS MATERIALIZED ( DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM select_data WHERE key > 1 + random()) RETURNING * ) SELECT * FROM raw_data; DEBUG: generating subplan XXX_1 for CTE select_data: SELECT key, value FROM intermediate_result_pruning.table_1 DEBUG: generating subplan XXX_2 for CTE raw_data: DELETE FROM intermediate_result_pruning.table_2 WHERE ((value)::integer OPERATOR(pg_catalog.>=) (SELECT min(select_data.key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE ((select_data.key)::double precision OPERATOR(pg_catalog.>) ((1)::double precision OPERATOR(pg_catalog.+) random())))) RETURNING key, value DEBUG: generating subplan XXX_1 for subquery SELECT min(key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE ((key)::double precision OPERATOR(pg_catalog.>) ((1)::double precision OPERATOR(pg_catalog.+) random())) DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM intermediate_result_pruning.table_2 WHERE ((value)::integer OPERATOR(pg_catalog.>=) (SELECT intermediate_result.min FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min integer))) RETURNING key, value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) raw_data DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx key | value --------------------------------------------------------------------- 3 | 3 4 | 4 5 | 5 6 | 6 (4 rows) ROLLBACK; -- now, we need only two intermediate results as the subquery in WHERE clause is -- router plannable BEGIN; WITH select_data AS MATERIALIZED ( SELECT * FROM table_1 ), raw_data AS MATERIALIZED ( DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM table_1 WHERE key > random()) AND key = 6 RETURNING * ) SELECT * FROM raw_data; DEBUG: generating subplan XXX_1 for CTE raw_data: DELETE FROM intermediate_result_pruning.table_2 WHERE (((value)::integer OPERATOR(pg_catalog.>=) (SELECT min(table_1.key) AS min FROM intermediate_result_pruning.table_1 WHERE ((table_1.key)::double precision OPERATOR(pg_catalog.>) random()))) AND (key OPERATOR(pg_catalog.=) 6)) RETURNING key, value DEBUG: generating subplan XXX_1 for subquery SELECT min(key) AS min FROM intermediate_result_pruning.table_1 WHERE ((key)::double precision OPERATOR(pg_catalog.>) random()) DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM intermediate_result_pruning.table_2 WHERE (((value)::integer OPERATOR(pg_catalog.>=) (SELECT intermediate_result.min FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min integer))) AND (key OPERATOR(pg_catalog.=) 6)) RETURNING key, value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) raw_data DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx key | value --------------------------------------------------------------------- 6 | 6 (1 row) ROLLBACK; -- test with INSERT SELECT via coordinator -- INSERT .. SELECT via coordinator that doesn't have any intermediate results -- We use offset 1 to make sure the result needs to be pulled to the coordinator, offset 0 would be optimized away INSERT INTO table_1 SELECT * FROM table_2 OFFSET 1; DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator -- INSERT .. SELECT via coordinator which has intermediate result, -- and can be pruned to a single worker because the final query is on -- single shard via filter in key INSERT INTO table_1 SELECT * FROM table_2 where value IN (SELECT value FROM table_1 WHERE random() > 1) AND key = 1; DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for subquery SELECT value FROM intermediate_result_pruning.table_1 WHERE (random() OPERATOR(pg_catalog.>) (1)::double precision) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM intermediate_result_pruning.table_2 WHERE ((value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text))) AND (key OPERATOR(pg_catalog.=) 1)) DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -- a similar query, with more complex subquery INSERT INTO table_1 SELECT * FROM table_2 where key = 1 AND value::int IN (WITH cte_1 AS MATERIALIZED ( (SELECT key FROM table_1 WHERE key = 1) INTERSECT (SELECT key FROM table_1 WHERE key = 2) ), cte_2 AS MATERIALIZED ( (SELECT key FROM table_1 WHERE key = 3) INTERSECT (SELECT key FROM table_1 WHERE key = 4) ) SELECT * FROM cte_1 UNION SELECT * FROM cte_2); DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) DEBUG: generating subplan XXX_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 3) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 4) DEBUG: generating subplan XXX_3 for subquery SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM intermediate_result_pruning.table_2 WHERE ((key OPERATOR(pg_catalog.=) 1) AND ((value)::integer OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))) DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx -- same query, cte is on the FROM clause -- and this time the final query (and top-level intermediate result) -- hits all the shards because table_2.key != 1 INSERT INTO table_1 SELECT table_2.* FROM table_2, (WITH cte_1 AS MATERIALIZED ( (SELECT key FROM table_1 WHERE key = 1) INTERSECT (SELECT key FROM table_1 WHERE key = 2) ), cte_2 AS MATERIALIZED ( (SELECT key FROM table_1 WHERE key = 3) INTERSECT (SELECT key FROM table_1 WHERE key = 4) ) SELECT * FROM cte_1 UNION SELECT * FROM cte_2 ) foo where table_2.key != 1 AND foo.key = table_2.value::int; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) DEBUG: generating subplan XXX_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 3) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 4) DEBUG: generating subplan XXX_3 for subquery SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table_2.key, table_2.value FROM intermediate_result_pruning.table_2, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo WHERE ((table_2.key OPERATOR(pg_catalog.<>) 1) AND (foo.key OPERATOR(pg_catalog.=) (table_2.value)::integer)) DEBUG: performing repartitioned INSERT ... SELECT DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx -- append partitioned/heap-type -- do not print out 'building index pg_toast_xxxxx_index' messages SET client_min_messages TO DEFAULT; CREATE TABLE range_partitioned(range_column text, data int); SET client_min_messages TO DEBUG1; SELECT create_distributed_table('range_partitioned', 'range_column', 'range'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT master_create_empty_shard('range_partitioned'); master_create_empty_shard --------------------------------------------------------------------- 1480013 (1 row) SELECT master_create_empty_shard('range_partitioned'); master_create_empty_shard --------------------------------------------------------------------- 1480014 (1 row) SELECT master_create_empty_shard('range_partitioned'); master_create_empty_shard --------------------------------------------------------------------- 1480015 (1 row) SELECT master_create_empty_shard('range_partitioned'); master_create_empty_shard --------------------------------------------------------------------- 1480016 (1 row) SELECT master_create_empty_shard('range_partitioned'); master_create_empty_shard --------------------------------------------------------------------- 1480017 (1 row) UPDATE pg_dist_shard SET shardminvalue = 'A', shardmaxvalue = 'D' WHERE shardid = 1480013; UPDATE pg_dist_shard SET shardminvalue = 'D', shardmaxvalue = 'G' WHERE shardid = 1480014; UPDATE pg_dist_shard SET shardminvalue = 'G', shardmaxvalue = 'K' WHERE shardid = 1480015; UPDATE pg_dist_shard SET shardminvalue = 'K', shardmaxvalue = 'O' WHERE shardid = 1480016; UPDATE pg_dist_shard SET shardminvalue = 'O', shardmaxvalue = 'Z' WHERE shardid = 1480017; -- final query goes to a single shard SELECT count(*) FROM range_partitioned WHERE range_column = 'A' AND data IN (SELECT data FROM range_partitioned); DEBUG: generating subplan XXX_1 for subquery SELECT data FROM intermediate_result_pruning.range_partitioned DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM intermediate_result_pruning.range_partitioned WHERE ((range_column OPERATOR(pg_catalog.=) 'A'::text) AND (data OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.data FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(data integer)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 0 (1 row) -- final query goes to three shards, so multiple workers SELECT count(*) FROM range_partitioned WHERE range_column >= 'A' AND range_column <= 'K' AND data IN (SELECT data FROM range_partitioned); DEBUG: generating subplan XXX_1 for subquery SELECT data FROM intermediate_result_pruning.range_partitioned DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM intermediate_result_pruning.range_partitioned WHERE ((range_column OPERATOR(pg_catalog.>=) 'A'::text) AND (range_column OPERATOR(pg_catalog.<=) 'K'::text) AND (data OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.data FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(data integer)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 0 (1 row) -- two shards, both of which are on the first node WITH some_data AS ( SELECT data FROM range_partitioned ) SELECT count(*) FROM range_partitioned WHERE range_column IN ('A', 'E') AND range_partitioned.data IN (SELECT data FROM some_data); DEBUG: CTE some_data is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for subquery SELECT data FROM (SELECT range_partitioned.data FROM intermediate_result_pruning.range_partitioned) some_data DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM intermediate_result_pruning.range_partitioned WHERE ((range_column OPERATOR(pg_catalog.=) ANY (ARRAY['A'::text, 'E'::text])) AND (data OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.data FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(data integer)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 0 (1 row) -- test case for issue #3556 CREATE TABLE accounts (id text PRIMARY KEY); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "accounts_pkey" for table "accounts" CREATE TABLE stats (account_id text PRIMARY KEY, spent int); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "stats_pkey" for table "stats" SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO accounts (id) VALUES ('foo'); INSERT INTO stats (account_id, spent) VALUES ('foo', 100); SELECT * FROM ( WITH accounts_cte AS MATERIALIZED ( SELECT id AS account_id FROM accounts ), joined_stats_cte_1 AS MATERIALIZED ( SELECT spent, account_id FROM stats INNER JOIN accounts_cte USING (account_id) ), joined_stats_cte_2 AS MATERIALIZED ( SELECT spent, account_id FROM joined_stats_cte_1 INNER JOIN accounts_cte USING (account_id) ) SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) FROM accounts_cte INNER JOIN joined_stats_cte_2 USING (account_id) ) inner_query; DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))) inner_query DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be written to local file sum --------------------------------------------------------------------- 100 (1 row) -- confirm that the pruning works well when using round-robin as well SET citus.task_assignment_policy to 'round-robin'; SELECT * FROM ( WITH accounts_cte AS MATERIALIZED ( SELECT id AS account_id FROM accounts ), joined_stats_cte_1 AS MATERIALIZED ( SELECT spent, account_id FROM stats INNER JOIN accounts_cte USING (account_id) ), joined_stats_cte_2 AS MATERIALIZED ( SELECT spent, account_id FROM joined_stats_cte_1 INNER JOIN accounts_cte USING (account_id) ) SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) FROM accounts_cte INNER JOIN joined_stats_cte_2 USING (account_id) ) inner_query; DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))) inner_query DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx sum --------------------------------------------------------------------- 100 (1 row) RESET citus.task_assignment_policy; -- Insert..select is planned differently, make sure we have results everywhere. -- We put the insert..select in a CTE here to prevent the CTE from being moved -- into the select, which would follow the regular code path for select. WITH stats AS MATERIALIZED ( SELECT count(key) m FROM table_3 ), inserts AS MATERIALIZED ( INSERT INTO table_2 SELECT key, count(*) FROM table_1 WHERE key > (SELECT m FROM stats) GROUP BY key HAVING count(*) < (SELECT m FROM stats) LIMIT 1 RETURNING * ) SELECT count(*) FROM inserts; DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM intermediate_result_pruning.table_3 DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO intermediate_result_pruning.table_2 (key, value) SELECT key, count(*) AS count FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.>) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: push down of limit count: 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be written to local file DEBUG: Collecting INSERT ... SELECT results on coordinator count --------------------------------------------------------------------- 1 (1 row) SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned; DROP SCHEMA intermediate_result_pruning;