CREATE SCHEMA locally_execute_intermediate_results; SET search_path TO locally_execute_intermediate_results; SET citus.log_intermediate_results TO TRUE; SET citus.log_local_commands TO TRUE; SET citus.shard_count TO 4; SET citus.next_shard_id TO 1580000; SET citus.shard_replication_factor TO 1; CREATE TABLE table_1 (key int, value text); SELECT create_distributed_table('table_1', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table_2 (key int, value text); SELECT create_distributed_table('table_2', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE ref_table (key int, value text); SELECT create_reference_table('ref_table'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE TABLE local_table (key int, value text); -- load some data INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text) INSERT INTO local_table VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); -- and have a lot more CTEs recursively planned for the -- sake of increasing the test coverage SET client_min_messages TO DEBUG1; -- the query cannot be executed locally, but still because of -- HAVING the intermediate result is written to local file as well WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY value HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be written to local file count --------------------------------------------------------------------- 1 1 (2 rows) -- in this case, the HAVING Is also pushed down WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 1 1 (2 rows) -- subquery in the WHERE part of the query can be executed locally WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 WHERE key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 count --------------------------------------------------------------------- 1 1 (2 rows) -- subquery in the WHERE part of the query should not be executed locally -- because it can be pushed down with the jointree WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(key) FROM table_2) SELECT count(*) FROM table_2 WHERE key > (SELECT max FROM cte_2) GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- (0 rows) -- now all the intermediate results are safe to be in local files WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(key) FROM table_2), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM cte_3 WHERE key > (SELECT max FROM cte_2) GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) count --------------------------------------------------------------------- (0 rows) -- multiple CTEs are joined inside HAVING, so written to file -- locally, but nothing executed locally WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY value HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 1 1 (2 rows) -- same as above, but HAVING pushed down to workers WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY key HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 1 1 (2 rows) -- multiple CTEs are joined inside HAVING, so written to file -- locally, also the join tree contains only another CTE, so should be -- executed locally WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM cte_3 GROUP BY key HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) count --------------------------------------------------------------------- 1 1 (2 rows) -- now, the CTE is going to be written locally, -- plus that is going to be read locally because -- of the aggragate over the cte in HAVING WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY value HAVING max(value) > (SELECT max(max) FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 count --------------------------------------------------------------------- 1 1 (2 rows) -- same as above, but with HAVING pushed down WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY key HAVING max(value) > (SELECT max(max) FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 count --------------------------------------------------------------------- 1 1 (2 rows) -- two ctes are going to be written locally and executed locally WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM table_1) SELECT count(*) FROM cte_2 GROUP BY key HAVING max(value) < (SELECT max(max) FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) count --------------------------------------------------------------------- 1 1 1 (3 rows) -- this time the same CTE is both joined with a distributed -- table and used in HAVING WITH a AS MATERIALIZED (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) > (SELECT value FROM a)); DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count | key --------------------------------------------------------------------- (0 rows) -- this time the same CTE is both joined with a distributed -- table and used in HAVING -- but used in another subquery/aggregate -- so one more level of recursive planning WITH a AS MATERIALIZED (SELECT * FROM table_1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) = (SELECT max(value) FROM a)); DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for subquery SELECT max(value) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.=) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a count | key --------------------------------------------------------------------- 1 | 4 (1 row) -- same query as the above, without the aggragate WITH a AS MATERIALIZED (SELECT max(key) as key, max(value) as value FROM ref_table) SELECT count(*), key FROM a JOIN ref_table USING (key) GROUP BY key HAVING (max(ref_table.value) <= (SELECT value FROM a)); NOTICE: executing the command locally: WITH a AS MATERIALIZED (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1)) count | key --------------------------------------------------------------------- 1 | 6 (1 row) -- some edge cases around CTEs used inside other CTEs -- everything can be executed locally WITH cte_1 as MATERIALIZED (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) FROM cte_2) SELECT * FROM cte_3; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_3 DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 NOTICE: executing the command locally: SELECT max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_3 max --------------------------------------------------------------------- 4 (1 row) -- the join between cte_3 and table_2 has to happen remotely WITH cte_1 as MATERIALIZED (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 1; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 key | value --------------------------------------------------------------------- (0 rows) -- the join between cte_3 and table_2 has to happen remotely WITH cte_1 as MATERIALIZED (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN ref_table USING (key); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table USING (key)) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 NOTICE: executing the command locally: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) key | value --------------------------------------------------------------------- 4 | 4 (1 row) -- some cases around router queries -- a router query, but the having has two cte joins WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 WHERE KEY = 3 GROUP BY KEY HAVING max(value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.=) 3) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- (0 rows) -- a router query, but the having has two cte joins -- and the jointree has a join with another cte WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 JOIN cte_3 USING(key) WHERE KEY = 3 GROUP BY table_2.KEY HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx count --------------------------------------------------------------------- (0 rows) -- a router query, but the having has two cte joins -- and the jointree has a join with the same CTEs WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX) WHERE KEY = 3 GROUP BY table_2.KEY HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 ON ((table_2.key OPERATOR(pg_catalog.=) (cte_2.max)::integer))) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 USING (max)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2_1 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx count --------------------------------------------------------------------- (0 rows) -- subPlans needed remotely as the subquery is pushed down WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar WHERE foo.key = bar.key; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT table_2.key FROM locally_execute_intermediate_results.table_2 GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))) bar WHERE (foo.key OPERATOR(pg_catalog.=) bar.key) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx key | key --------------------------------------------------------------------- (0 rows) -- the second subquery needs to be recursively planned due to non-colocated subquery join -- so cte_2 becomes part of master query of that recursive subquery planning WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT value AS key FROM table_1 GROUP BY value HAVING max(value) > (SELECT * FROM cte_1)) as foo, (SELECT value AS key FROM table_2 GROUP BY value HAVING max(value) > (SELECT * FROM cte_2)) as bar WHERE foo.key != bar.key; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_1 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: generating subplan XXX_4 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be written to local file DEBUG: Subplan XXX_4 will be written to local file NOTICE: executing the command locally: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) key | key --------------------------------------------------------------------- (0 rows) -- similar to above, but having pushed down WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar WHERE foo.key != bar.key; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx key | key --------------------------------------------------------------------- (0 rows) -- now, forcing all subqueries to be on the local node WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT value AS key FROM table_1 GROUP BY value HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo, (SELECT value AS key FROM table_2 GROUP BY value HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar WHERE foo.key != bar.key; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_1 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) LIMIT 1 DEBUG: generating subplan XXX_4 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) LIMIT 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be written to local file DEBUG: Subplan XXX_4 will be written to local file NOTICE: executing the command locally: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) key | key --------------------------------------------------------------------- (0 rows) -- queries in which the last step has only CTEs can use local tables WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM local_table GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) count --------------------------------------------------------------------- 1 1 (2 rows) WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM local_table WHERE key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 NOTICE: executing the command locally: SELECT count(*) AS count FROM locally_execute_intermediate_results.local_table WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) count --------------------------------------------------------------------- 1 1 (2 rows) \c - - - :worker_1_port -- now use the same queries on a worker SET search_path TO locally_execute_intermediate_results; SET citus.log_intermediate_results TO TRUE; SET citus.log_local_commands TO TRUE; SET client_min_messages TO DEBUG1; -- the query cannot be executed locally, but still because of -- HAVING the intermediate result is written to local file as well WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY value HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 count --------------------------------------------------------------------- 1 1 (2 rows) -- On non-mx case the subquery in the WHERE part of the query can be executed locally -- however, on Citus MX we have this limitation where the query cannot be executed locally WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 WHERE key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) GROUP BY value HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE (table_2.key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE (table_2.key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))) worker_subquery GROUP BY worker_column_1 count --------------------------------------------------------------------- 1 1 (2 rows) -- subquery in the WHERE part of the query should not be executed locally -- because it can be pushed down with the jointree WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(key) FROM table_2) SELECT count(*) FROM table_2 WHERE key > (SELECT max FROM cte_2) GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2 FROM (SELECT table_2.key AS worker_column_1, table_2.value AS worker_column_2 FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE (table_2.key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2))) worker_subquery GROUP BY worker_column_1 HAVING (max(worker_column_2) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2 FROM (SELECT table_2.key AS worker_column_1, table_2.value AS worker_column_2 FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE (table_2.key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2))) worker_subquery GROUP BY worker_column_1 HAVING (max(worker_column_2) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) count --------------------------------------------------------------------- (0 rows) -- although all the intermediate results are safe to be in local files -- we currently do not support it on Citus MX WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(key) FROM table_2), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM cte_3 WHERE key > (SELECT max FROM cte_2) GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) count --------------------------------------------------------------------- (0 rows) -- multiple CTEs are joined inside HAVING, so written to file -- locally, but nothing executed locally WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY value HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 count --------------------------------------------------------------------- 1 1 (2 rows) -- multiple CTEs are joined inside HAVING, so written to file -- locally, also the join tree contains only another CTE, so should be -- executed locally, but not on an Citus MX worker WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM cte_3 GROUP BY key HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) count --------------------------------------------------------------------- 1 1 (2 rows) -- now, the CTE is going to be written locally, -- plus that could have been read locally on the coordinator -- because of the aggragate over the cte in HAVING -- but not on Citus MX WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY value HAVING max(value) > (SELECT max(max) FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 count --------------------------------------------------------------------- 1 1 (2 rows) -- two could have been written locally and executed locally -- on the coordinator, but not on the workers WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM table_1) SELECT count(*) FROM cte_2 GROUP BY key HAVING max(value) < (SELECT max(max) FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) count --------------------------------------------------------------------- 1 1 1 (3 rows) -- this time the same CTE is both joined with a distributed -- table and used in HAVING WITH a AS MATERIALIZED (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) > (SELECT value FROM a)); DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2_1580004 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2_1580006 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 count | key --------------------------------------------------------------------- (0 rows) -- this time the same CTE is both joined with a distributed -- table and used in HAVING -- but used in another subquery/aggregate -- so one more level of recursive planning WITH a AS MATERIALIZED (SELECT * FROM table_1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) = (SELECT max(value) FROM a)); DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for subquery SELECT max(value) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.=) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2_1580004 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2_1580006 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 count | key --------------------------------------------------------------------- 1 | 4 (1 row) -- same query as the above, without the aggragate WITH a AS MATERIALIZED (SELECT max(key) as key, max(value) as value FROM ref_table) SELECT count(*), key FROM a JOIN ref_table USING (key) GROUP BY key HAVING (max(ref_table.value) <= (SELECT value FROM a)); NOTICE: executing the command locally: WITH a AS MATERIALIZED (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1)) count | key --------------------------------------------------------------------- 1 | 6 (1 row) -- some edge cases around CTEs used inside other CTEs -- everything could be executed locally on the coordinator, -- but not on the worker WITH cte_1 as MATERIALIZED (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) FROM cte_2) SELECT * FROM cte_3; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_3 DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 NOTICE: executing the command locally: SELECT max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_3 max --------------------------------------------------------------------- 4 (1 row) -- the join between cte_3 and table_2 has to could have happened -- locally since the key = 1 resides on this node -- but because of the current implementation limitations we can't WITH cte_1 as MATERIALIZED (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 1; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 NOTICE: executing the command locally: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2_1580004 table_2(key, value) USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) key | value --------------------------------------------------------------------- (0 rows) -- the join between cte_3 and table_2 has to cannot happen -- locally because the key = 2 resides on a remote node WITH cte_1 as MATERIALIZED (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 2; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 2) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 key | value --------------------------------------------------------------------- (0 rows) -- the join between cte_3 and ref can could have happened locally -- but because of the current implementation limitations we can't WITH cte_1 as MATERIALIZED (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN ref_table USING (key); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table USING (key)) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 NOTICE: executing the command locally: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) key | value --------------------------------------------------------------------- 4 | 4 (1 row) -- some cases around router queries -- a router query, but the having has two cte joins WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 WHERE KEY = 3 GROUP BY KEY HAVING max(value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.=) 3) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true count --------------------------------------------------------------------- (0 rows) -- a router query, but the having has two cte joins -- and the jointree has a join with another cte WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 JOIN cte_3 USING(key) WHERE KEY = 3 GROUP BY table_2.KEY HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true count --------------------------------------------------------------------- (0 rows) -- a router query, but the having has two cte joins -- and the jointree has a join with the same CTEs WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX) WHERE KEY = 3 GROUP BY table_2.KEY HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 ON ((table_2.key OPERATOR(pg_catalog.=) (cte_2.max)::integer))) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 USING (max)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2_1 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true count --------------------------------------------------------------------- (0 rows) -- subPlans needed remotely as the subquery is pushed down WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar WHERE foo.key = bar.key; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT table_2.key FROM locally_execute_intermediate_results.table_2 GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))) bar WHERE (foo.key OPERATOR(pg_catalog.=) bar.key) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true NOTICE: executing the command locally: SELECT worker_column_1 AS key, worker_column_2 AS key FROM (SELECT foo.key AS worker_column_1, bar.key AS worker_column_2 FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1_1580000 table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT table_2.key FROM locally_execute_intermediate_results.table_2_1580004 table_2 GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))) bar WHERE (foo.key OPERATOR(pg_catalog.=) bar.key)) worker_subquery NOTICE: executing the command locally: SELECT worker_column_1 AS key, worker_column_2 AS key FROM (SELECT foo.key AS worker_column_1, bar.key AS worker_column_2 FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1_1580002 table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT table_2.key FROM locally_execute_intermediate_results.table_2_1580006 table_2 GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))) bar WHERE (foo.key OPERATOR(pg_catalog.=) bar.key)) worker_subquery key | key --------------------------------------------------------------------- (0 rows) -- the second subquery needs to be recursively planned due to non-colocated subquery join -- so cte_2 becomes part of master query of that recursive subquery planning WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT value AS key FROM table_1 GROUP BY value HAVING max(value) > (SELECT * FROM cte_1)) as foo, (SELECT value AS key FROM table_2 GROUP BY value HAVING max(value) > (SELECT * FROM cte_2)) as bar WHERE foo.key != bar.key; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_1 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: generating subplan XXX_4 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_1.value AS worker_column_1 FROM locally_execute_intermediate_results.table_1_1580000 table_1) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_1.value AS worker_column_1 FROM locally_execute_intermediate_results.table_1_1580002 table_1) worker_subquery GROUP BY worker_column_1 DEBUG: Subplan XXX_4 will be written to local file NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) key | key --------------------------------------------------------------------- (0 rows) -- now, forcing all subqueries to be on the local node WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT value AS key FROM table_1 GROUP BY value HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo, (SELECT value AS key FROM table_2 GROUP BY value HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar WHERE foo.key != bar.key; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_1 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) LIMIT 1 DEBUG: generating subplan XXX_4 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) LIMIT 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true DEBUG: Subplan XXX_3 will be written to local file NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_1.value AS worker_column_1 FROM locally_execute_intermediate_results.table_1_1580000 table_1) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_1.value AS worker_column_1 FROM locally_execute_intermediate_results.table_1_1580002 table_1) worker_subquery GROUP BY worker_column_1 DEBUG: Subplan XXX_4 will be written to local file NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) key | key --------------------------------------------------------------------- (0 rows) -- finally, use round-robin policy on the workers with same set of queries set citus.task_assignment_policy TO "round-robin" ; -- the query cannot be executed locally, but still because of -- HAVING the intermediate result is written to local file as well WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY value HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 count --------------------------------------------------------------------- 1 1 (2 rows) -- On non-mx case the subquery in the WHERE part of the query can be executed locally -- however, on Citus MX we have this limitation where the query cannot be executed locally WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 WHERE key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2 FROM (SELECT table_2.key AS worker_column_1, table_2.value AS worker_column_2 FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE (table_2.key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))) worker_subquery GROUP BY worker_column_1 HAVING (max(worker_column_2) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2 FROM (SELECT table_2.key AS worker_column_1, table_2.value AS worker_column_2 FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE (table_2.key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))) worker_subquery GROUP BY worker_column_1 HAVING (max(worker_column_2) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) count --------------------------------------------------------------------- 1 1 (2 rows) -- subquery in the WHERE part of the query should not be executed locally -- because it can be pushed down with the jointree WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(key) FROM table_2) SELECT count(*) FROM table_2 WHERE key > (SELECT max FROM cte_2) GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2 FROM (SELECT table_2.key AS worker_column_1, table_2.value AS worker_column_2 FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE (table_2.key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2))) worker_subquery GROUP BY worker_column_1 HAVING (max(worker_column_2) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2 FROM (SELECT table_2.key AS worker_column_1, table_2.value AS worker_column_2 FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE (table_2.key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2))) worker_subquery GROUP BY worker_column_1 HAVING (max(worker_column_2) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) count --------------------------------------------------------------------- (0 rows) -- although all the intermediate results are safe to be in local files -- we currently do not support it on Citus MX WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(key) FROM table_2), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM cte_3 WHERE key > (SELECT max FROM cte_2) GROUP BY key HAVING max(value) > (SELECT max FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) count --------------------------------------------------------------------- (0 rows) -- multiple CTEs are joined inside HAVING, so written to file -- locally, but nothing executed locally WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY key HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2 FROM (SELECT table_2.key AS worker_column_1, table_2.value AS worker_column_2 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 HAVING (max(worker_column_2) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2 FROM (SELECT table_2.key AS worker_column_1, table_2.value AS worker_column_2 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 HAVING (max(worker_column_2) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) count --------------------------------------------------------------------- 1 1 (2 rows) -- multiple CTEs are joined inside HAVING, so written to file -- locally, also the join tree contains only another CTE, so should be -- executed locally, but not on an Citus MX worker WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM cte_3 GROUP BY value HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true count --------------------------------------------------------------------- 1 1 (2 rows) -- now, the CTE is going to be written locally, -- plus that could have been read locally on the coordinator -- because of the aggragate over the cte in HAVING -- but not on Citus MX WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1) SELECT count(*) FROM table_2 GROUP BY value HAVING max(value) > (SELECT max(max) FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS worker_column_2, max(worker_column_1) AS worker_column_3 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 count --------------------------------------------------------------------- 1 1 (2 rows) -- two could have been written locally and executed locally -- on the coordinator, but not on the workers WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM table_1) SELECT count(*) FROM cte_2 GROUP BY key HAVING max(value) < (SELECT max(max) FROM cte_1); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) count --------------------------------------------------------------------- 1 1 1 (3 rows) -- this time the same CTE is both joined with a distributed -- table and used in HAVING WITH a AS MATERIALIZED (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) > (SELECT value FROM a)); DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2_1580004 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2_1580006 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 count | key --------------------------------------------------------------------- (0 rows) -- this time the same CTE is both joined with a distributed -- table and used in HAVING -- but used in another subquery/aggregate -- so one more level of recursive planning WITH a AS MATERIALIZED (SELECT * FROM table_1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) = (SELECT max(value) FROM a)); DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for subquery SELECT max(value) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.=) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2_1580004 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2_1580006 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 count | key --------------------------------------------------------------------- 1 | 4 (1 row) -- same query as the above, without the aggragate WITH a AS MATERIALIZED (SELECT max(key) as key, max(value) as value FROM ref_table) SELECT count(*), key FROM a JOIN ref_table USING (key) GROUP BY key HAVING (max(ref_table.value) <= (SELECT value FROM a)); NOTICE: executing the command locally: WITH a AS MATERIALIZED (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1)) count | key --------------------------------------------------------------------- 1 | 6 (1 row) -- some edge cases around CTEs used inside other CTEs -- everything could be executed locally on the coordinator, -- but not on the worker WITH cte_1 as (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) FROM cte_2) SELECT * FROM cte_3; DEBUG: CTE cte_1 is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE cte_2: SELECT key, value FROM (SELECT table_1.key, table_1.value FROM locally_execute_intermediate_results.table_1) cte_1 DEBUG: generating subplan XXX_2 for CTE cte_3: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_3 DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 max --------------------------------------------------------------------- 4 (1 row) -- the join between cte_3 and table_2 has to could have happened -- locally since the key = 1 resides on this node -- but because of the current implementation limitations we can't WITH cte_1 as (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 1; DEBUG: CTE cte_1 is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE cte_2: SELECT key, value FROM (SELECT table_1.key, table_1.value FROM locally_execute_intermediate_results.table_1) cte_1 DEBUG: generating subplan XXX_2 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 NOTICE: executing the command locally: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2_1580004 table_2(key, value) USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) key | value --------------------------------------------------------------------- (0 rows) -- the join between cte_3 and table_2 has to cannot happen -- locally because the key = 2 resides on a remote node WITH cte_1 as (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 2; DEBUG: CTE cte_1 is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE cte_2: SELECT key, value FROM (SELECT table_1.key, table_1.value FROM locally_execute_intermediate_results.table_1) cte_1 DEBUG: generating subplan XXX_2 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 2) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx key | value --------------------------------------------------------------------- (0 rows) -- the join between cte_3 and ref can could have happened locally -- but because of the current implementation limitations we can't WITH cte_1 as (SELECT * FROM table_1), cte_2 AS MATERIALIZED (SELECT * FROM cte_1), cte_3 AS MATERIALIZED (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN ref_table USING (key); DEBUG: CTE cte_1 is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE cte_2: SELECT key, value FROM (SELECT table_1.key, table_1.value FROM locally_execute_intermediate_results.table_1) cte_1 DEBUG: generating subplan XXX_2 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table USING (key)) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 NOTICE: executing the command locally: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) key | value --------------------------------------------------------------------- 4 | 4 (1 row) -- some cases around router queries -- a router query, but the having has two cte joins WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 WHERE KEY = 3 GROUP BY KEY HAVING max(value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.=) 3) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true count --------------------------------------------------------------------- (0 rows) -- a router query, but the having has two cte joins -- and the jointree has a join with another cte WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 JOIN cte_3 USING(key) WHERE KEY = 3 GROUP BY table_2.KEY HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true count --------------------------------------------------------------------- (0 rows) -- the same query as above, try to hit local node with either of the queries WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 JOIN cte_3 USING(key) WHERE KEY = 3 GROUP BY table_2.KEY HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true count --------------------------------------------------------------------- (0 rows) -- a router query, but the having has two cte joins -- and the jointree has a join with the same CTEs WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_3 AS MATERIALIZED (SELECT * FROM table_2) SELECT count(*) FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX) WHERE KEY = 3 GROUP BY table_2.KEY HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 ON ((table_2.key OPERATOR(pg_catalog.=) (cte_2.max)::integer))) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 USING (max)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2_1 USING (max)))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT key, value FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true count --------------------------------------------------------------------- (0 rows) - subPlans needed remotely as the subquery is pushed down WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar WHERE foo.key = bar.key; ERROR: syntax error at or near "-" -- the second subquery needs to be recursively planned due to non-colocated subquery join -- so cte_2 becomes part of master query of that recursive subquery planning WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT value AS key FROM table_1 GROUP BY value HAVING max(value) > (SELECT * FROM cte_1)) as foo, (SELECT value AS key FROM table_2 GROUP BY value HAVING max(value) > (SELECT * FROM cte_2)) as bar WHERE foo.key != bar.key; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_1 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) DEBUG: generating subplan XXX_4 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_1.value AS worker_column_1 FROM locally_execute_intermediate_results.table_1_1580000 table_1) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_1.value AS worker_column_1 FROM locally_execute_intermediate_results.table_1_1580002 table_1) worker_subquery GROUP BY worker_column_1 DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 key | key --------------------------------------------------------------------- (0 rows) -- now, forcing all subqueries to be on the local node WITH cte_1 AS MATERIALIZED (SELECT max(value) FROM table_1), cte_2 AS MATERIALIZED (SELECT max(value) FROM table_2) SELECT * FROM (SELECT value AS key FROM table_1 GROUP BY value HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo, (SELECT value AS key FROM table_2 GROUP BY value HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar WHERE foo.key != bar.key; DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_1 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) LIMIT 1 DEBUG: generating subplan XXX_4 for subquery SELECT value AS key FROM locally_execute_intermediate_results.table_2 GROUP BY value HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) LIMIT 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) DEBUG: Subplan XXX_1 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580000 table_1 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1_1580002 table_1 WHERE true DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580004 table_2 WHERE true NOTICE: executing the command locally: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2_1580006 table_2 WHERE true DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_1.value AS worker_column_1 FROM locally_execute_intermediate_results.table_1_1580000 table_1) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_1.value AS worker_column_1 FROM locally_execute_intermediate_results.table_1_1580002 table_1) worker_subquery GROUP BY worker_column_1 DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580004 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT worker_column_1 AS key, max(worker_column_1) AS worker_column_2 FROM (SELECT table_2.value AS worker_column_1 FROM locally_execute_intermediate_results.table_2_1580006 table_2) worker_subquery GROUP BY worker_column_1 NOTICE: executing the command locally: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key text)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key text)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) key | key --------------------------------------------------------------------- (0 rows) \c - - - :master_port SET client_min_messages TO ERROR; DROP SCHEMA locally_execute_intermediate_results CASCADE;