CREATE SCHEMA recursive_dml_queries; SET search_path TO recursive_dml_queries, public; SET citus.next_shard_id TO 2370000; CREATE TABLE recursive_dml_queries.distributed_table (tenant_id text, dept int, info jsonb); SELECT create_distributed_table('distributed_table', 'tenant_id'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE recursive_dml_queries.second_distributed_table (tenant_id text, dept int, info jsonb); SELECT create_distributed_table('second_distributed_table', 'tenant_id'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE recursive_dml_queries.reference_table (id text, name text); SELECT create_reference_table('reference_table'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE TABLE recursive_dml_queries.local_table (id text, name text); INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; INSERT INTO local_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; CREATE VIEW tenant_ids AS SELECT tenant_id, name FROM distributed_table, reference_table WHERE distributed_table.dept::text = reference_table.id ORDER BY 2 DESC, 1 DESC; SET client_min_messages TO DEBUG1; -- the subquery foo is recursively planned UPDATE reference_table SET name = 'new_' || name FROM ( SELECT avg(second_distributed_table.tenant_id::int) as avg_tenant_id FROM second_distributed_table ) as foo WHERE foo.avg_tenant_id::int::text = reference_table.id RETURNING reference_table.name; DEBUG: generating subplan XXX_1 for subquery SELECT avg((tenant_id)::integer) AS avg_tenant_id FROM recursive_dml_queries.second_distributed_table DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.reference_table SET name = ('new_'::text OPERATOR(pg_catalog.||) reference_table.name) FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text OPERATOR(pg_catalog.=) reference_table.id) RETURNING reference_table.name name --------------------------------------------------------------------- new_user_50 (1 row) -- the subquery foo is recursively planned -- but note that the subquery foo itself is pushdownable UPDATE second_distributed_table SET dept = foo.max_dept * 2 FROM ( SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) as max_dept FROM ( SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM second_distributed_table, distributed_table WHERE distributed_table.tenant_id = second_distributed_table.tenant_id ) foo_inner GROUP BY tenant_id ORDER BY 1 DESC ) as foo WHERE foo.tenant_id != second_distributed_table.tenant_id AND second_distributed_table.dept IN (2) RETURNING second_distributed_table.tenant_id, second_distributed_table.dept; DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) AS max_dept FROM (SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM recursive_dml_queries.second_distributed_table, recursive_dml_queries.distributed_table WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.=) second_distributed_table.tenant_id)) foo_inner GROUP BY tenant_id ORDER BY tenant_id DESC DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = (foo.max_dept OPERATOR(pg_catalog.*) 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id OPERATOR(pg_catalog.<>) second_distributed_table.tenant_id) AND (second_distributed_table.dept OPERATOR(pg_catalog.=) 2)) RETURNING second_distributed_table.tenant_id, second_distributed_table.dept tenant_id | dept --------------------------------------------------------------------- 12 | 18 2 | 18 22 | 18 32 | 18 42 | 18 52 | 18 62 | 18 72 | 18 82 | 18 92 | 18 (10 rows) -- the subquery foo is recursively planned -- and foo itself is a non colocated subquery and recursively planned UPDATE second_distributed_table SET dept = foo.tenant_id::int / 4 FROM ( SELECT DISTINCT foo_inner_1.tenant_id FROM ( SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM second_distributed_table, distributed_table WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND second_distributed_table.dept IN (3,4) ) foo_inner_1, ( SELECT second_distributed_table.tenant_id FROM second_distributed_table, distributed_table WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND second_distributed_table.dept IN (4,5) )foo_inner_2 WHERE foo_inner_1.tenant_id != foo_inner_2.tenant_id ) as foo WHERE foo.tenant_id != second_distributed_table.tenant_id AND second_distributed_table.dept IN (3); DEBUG: generating subplan XXX_1 for subquery SELECT second_distributed_table.tenant_id FROM recursive_dml_queries.second_distributed_table, recursive_dml_queries.distributed_table WHERE ((distributed_table.tenant_id OPERATOR(pg_catalog.=) second_distributed_table.tenant_id) AND (second_distributed_table.dept OPERATOR(pg_catalog.=) ANY (ARRAY[4, 5]))) DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT foo_inner_1.tenant_id FROM (SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM recursive_dml_queries.second_distributed_table, recursive_dml_queries.distributed_table WHERE ((distributed_table.tenant_id OPERATOR(pg_catalog.=) second_distributed_table.tenant_id) AND (second_distributed_table.dept OPERATOR(pg_catalog.=) ANY (ARRAY[3, 4])))) foo_inner_1, (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) foo_inner_2 WHERE (foo_inner_1.tenant_id OPERATOR(pg_catalog.<>) foo_inner_2.tenant_id) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = ((foo.tenant_id)::integer OPERATOR(pg_catalog./) 4) FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) foo WHERE ((foo.tenant_id OPERATOR(pg_catalog.<>) second_distributed_table.tenant_id) AND (second_distributed_table.dept OPERATOR(pg_catalog.=) 3)) -- we currently do not allow local tables in modification queries UPDATE distributed_table SET dept = avg_tenant_id::int FROM ( SELECT avg(local_table.id::int) as avg_tenant_id FROM local_table ) as foo WHERE foo.avg_tenant_id::int::text = distributed_table.tenant_id RETURNING distributed_table.*; DEBUG: generating subplan XXX_1 for subquery SELECT avg((id)::integer) AS avg_tenant_id FROM recursive_dml_queries.local_table DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = (foo.avg_tenant_id)::integer FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text OPERATOR(pg_catalog.=) distributed_table.tenant_id) RETURNING distributed_table.tenant_id, distributed_table.dept, distributed_table.info tenant_id | dept | info --------------------------------------------------------------------- 50 | 50 | {"f1": 50, "f2": 2500} (1 row) -- we currently do not allow views in modification queries UPDATE distributed_table SET dept = avg_tenant_id::int FROM ( SELECT avg(tenant_id::int) as avg_tenant_id FROM tenant_ids ) as foo WHERE foo.avg_tenant_id::int::text = distributed_table.tenant_id RETURNING distributed_table.*; DEBUG: generating subplan XXX_1 for subquery SELECT avg((tenant_id)::integer) AS avg_tenant_id FROM (SELECT distributed_table.tenant_id, reference_table.name FROM recursive_dml_queries.distributed_table, recursive_dml_queries.reference_table WHERE ((distributed_table.dept)::text OPERATOR(pg_catalog.=) reference_table.id) ORDER BY reference_table.name DESC, distributed_table.tenant_id DESC) tenant_ids DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = (foo.avg_tenant_id)::integer FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text OPERATOR(pg_catalog.=) distributed_table.tenant_id) RETURNING distributed_table.tenant_id, distributed_table.dept, distributed_table.info tenant_id | dept | info --------------------------------------------------------------------- 50 | 50 | {"f1": 50, "f2": 2500} (1 row) -- there is a lateral join (e.g., correlated subquery) thus the subqueries cannot be -- recursively planned, however it can be planned using the repartition planner SET citus.enable_repartition_joins to on; SELECT DISTINCT foo_inner_1.tenant_id FROM ( SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM second_distributed_table, distributed_table WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND second_distributed_table.dept IN (3,4) ) foo_inner_1 JOIN LATERAL ( SELECT second_distributed_table.tenant_id FROM second_distributed_table, distributed_table WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND foo_inner_1.dept = second_distributed_table.dept AND second_distributed_table.dept IN (4,5) ) foo_inner_2 ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) ORDER BY foo_inner_1.tenant_id; tenant_id --------------------------------------------------------------------- 14 24 34 4 44 54 64 74 84 94 (10 rows) RESET citus.enable_repartition_joins; -- there is a lateral join (e.g., correlated subquery) thus the subqueries cannot be -- recursively planned, this one can not be planned by the repartion planner -- because of the IN query on a non unique column UPDATE second_distributed_table SET dept = foo.tenant_id::int / 4 FROM ( SELECT DISTINCT foo_inner_1.tenant_id FROM ( SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM second_distributed_table, distributed_table WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND second_distributed_table.dept IN (select dept from second_distributed_table)) foo_inner_1 JOIN LATERAL ( SELECT second_distributed_table.tenant_id FROM second_distributed_table, distributed_table WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND foo_inner_1.dept = second_distributed_table.dept AND second_distributed_table.dept IN (4,5) ) foo_inner_2 ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) ) as foo RETURNING *; DEBUG: generating subplan XXX_1 for subquery SELECT dept FROM recursive_dml_queries.second_distributed_table ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- again a correlated subquery -- this time distribution key eq. exists -- however recursive planning is prevented due to correlated subqueries UPDATE second_distributed_table SET dept = foo.tenant_id::int / 4 FROM ( SELECT baz.tenant_id FROM ( SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM second_distributed_table, distributed_table as d1 WHERE d1.tenant_id = second_distributed_table.tenant_id AND second_distributed_table.dept IN (3,4) AND second_distributed_table.tenant_id IN ( SELECT s2.tenant_id FROM second_distributed_table as s2 GROUP BY d1.tenant_id, s2.tenant_id ) ) as baz ) as foo WHERE second_distributed_table.tenant_id = foo.tenant_id RETURNING *; ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- we don't support subqueries/CTEs inside VALUES INSERT INTO second_distributed_table (tenant_id, dept) VALUES ('3', (WITH vals AS (SELECT 3) select * from vals)); DEBUG: CTE vals is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE vals: SELECT 3 DEBUG: Plan XXX query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals("?column?"))) ERROR: subqueries are not supported within INSERT queries HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. INSERT INTO second_distributed_table (tenant_id, dept) VALUES ('3', (SELECT 3)); ERROR: subqueries are not supported within INSERT queries HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. -- DML with an unreferenced SELECT CTE WITH cte_1 AS ( WITH cte_2 AS ( SELECT tenant_id as cte2_id FROM second_distributed_table WHERE dept >= 2 ) UPDATE distributed_table SET dept = 10 RETURNING * ) UPDATE distributed_table SET dept = 5 FROM cte_1 WHERE distributed_table.tenant_id < cte_1.tenant_id; DEBUG: generating subplan XXX_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) WITH cte_1 AS ( WITH cte_2 AS ( SELECT tenant_id as cte2_id FROM second_distributed_table WHERE dept >= 2 ) UPDATE distributed_table SET dept = 10 RETURNING * ) UPDATE distributed_table SET dept = 5 FROM cte_1 WHERE distributed_table.tenant_id < cte_1.tenant_id; DEBUG: generating subplan XXX_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) -- we support updating local table with a join with -- distributed tables, though as the local table -- is target here, distributed table is recursively -- planned UPDATE local_table SET id = 'citus_test' FROM distributed_table WHERE distributed_table.tenant_id = local_table.id; DEBUG: Wrapping relation "distributed_table" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT tenant_id FROM recursive_dml_queries.distributed_table WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.local_table SET id = 'citus_test'::text FROM (SELECT distributed_table_1.tenant_id, NULL::integer AS dept, NULL::jsonb AS info FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) distributed_table_1) distributed_table WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.=) local_table.id) RESET client_min_messages; DROP SCHEMA recursive_dml_queries CASCADE; NOTICE: drop cascades to 5 other objects DETAIL: drop cascades to table distributed_table drop cascades to table second_distributed_table drop cascades to table reference_table drop cascades to table local_table drop cascades to view tenant_ids