CREATE SCHEMA with_dml; SET search_path TO with_dml, public; CREATE TABLE with_dml.distributed_table (tenant_id text PRIMARY KEY, dept int); SELECT create_distributed_table('distributed_table', 'tenant_id'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE with_dml.second_distributed_table (tenant_id text, dept int); SELECT create_distributed_table('second_distributed_table', 'tenant_id'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE with_dml.reference_table (id text, name text); SELECT create_reference_table('reference_table'); create_reference_table --------------------------------------------------------------------- (1 row) INSERT INTO distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i; INSERT INTO second_distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i; INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; SET client_min_messages TO DEBUG1; -- delete all tenants from the reference table whose dept is 1 WITH ids_to_delete AS ( SELECT tenant_id FROM distributed_table WHERE dept = 1 ) DELETE FROM reference_table WHERE id IN (SELECT tenant_id FROM ids_to_delete); DEBUG: generating subplan XXX_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.=) 1) DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM with_dml.reference_table WHERE (id OPERATOR(pg_catalog.=) ANY (SELECT ids_to_delete.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete)) -- update the name of the users whose dept is 2 WITH ids_to_update AS ( SELECT tenant_id FROM distributed_table WHERE dept = 2 ) UPDATE reference_table SET name = 'new_' || name WHERE id IN (SELECT tenant_id FROM ids_to_update); DEBUG: generating subplan XXX_1 for CTE ids_to_update: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.=) 2) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE with_dml.reference_table SET name = ('new_'::text OPERATOR(pg_catalog.||) name) WHERE (id OPERATOR(pg_catalog.=) ANY (SELECT ids_to_update.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_update)) -- now the CTE is also modifying WITH ids_deleted_3 AS ( DELETE FROM distributed_table WHERE dept = 3 RETURNING tenant_id ), ids_deleted_4 AS ( DELETE FROM distributed_table WHERE dept = 4 RETURNING tenant_id ) DELETE FROM reference_table WHERE id IN (SELECT * FROM ids_deleted_3 UNION SELECT * FROM ids_deleted_4); DEBUG: generating subplan XXX_1 for CTE ids_deleted_3: DELETE FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.=) 3) RETURNING tenant_id DEBUG: generating subplan XXX_2 for CTE ids_deleted_4: DELETE FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.=) 4) RETURNING tenant_id DEBUG: generating subplan XXX_3 for subquery SELECT ids_deleted_3.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_deleted_3 UNION SELECT ids_deleted_4.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_deleted_4 DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM with_dml.reference_table WHERE (id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text))) -- now the final UPDATE command is pushdownable WITH ids_to_delete AS ( SELECT tenant_id FROM distributed_table WHERE dept = 5 ) UPDATE distributed_table SET dept = dept + 1 FROM ids_to_delete, (SELECT tenant_id FROM distributed_table WHERE tenant_id::int < 60) as some_tenants WHERE some_tenants.tenant_id = ids_to_delete.tenant_id AND distributed_table.tenant_id = some_tenants.tenant_id AND EXISTS (SELECT * FROM ids_to_delete); DEBUG: generating subplan XXX_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.=) 5) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE with_dml.distributed_table SET dept = (distributed_table.dept OPERATOR(pg_catalog.+) 1) FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete, (SELECT distributed_table_1.tenant_id FROM with_dml.distributed_table distributed_table_1 WHERE ((distributed_table_1.tenant_id)::integer OPERATOR(pg_catalog.<) 60)) some_tenants WHERE ((some_tenants.tenant_id OPERATOR(pg_catalog.=) ids_to_delete.tenant_id) AND (distributed_table.tenant_id OPERATOR(pg_catalog.=) some_tenants.tenant_id) AND (EXISTS (SELECT ids_to_delete_1.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete_1))) SET client_min_messages TO WARNING; -- this query falls back repartitioned insert/select since we've some hard -- errors in the INSERT ... SELECT pushdown which prevents to fallback to -- recursive planning SELECT * FROM coordinator_plan($Q$ EXPLAIN (costs off) WITH ids_to_upsert AS ( SELECT tenant_id FROM distributed_table WHERE dept > 7 ) INSERT INTO distributed_table SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id ON CONFLICT (tenant_id) DO UPDATE SET dept = 8; $Q$) s WHERE s LIKE '%INSERT/SELECT method%'; query_plan --------------------------------------------------------------------- INSERT/SELECT method: repartition (1 row) SET client_min_messages TO DEBUG1; -- the following query is very similar to the above one -- but this time the query is pulled to coordinator since -- we return before hitting any hard errors WITH ids_to_insert AS ( SELECT (tenant_id::int * 100)::text as tenant_id FROM distributed_table WHERE dept > 7 ) INSERT INTO distributed_table SELECT DISTINCT ids_to_insert.tenant_id FROM ids_to_insert, distributed_table WHERE distributed_table.tenant_id < ids_to_insert.tenant_id; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. DEBUG: CTE ids_to_insert is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for subquery SELECT (((tenant_id)::integer OPERATOR(pg_catalog.*) 100))::text AS tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.>) 7) DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT ids_to_insert.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_insert, with_dml.distributed_table WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) ids_to_insert.tenant_id) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator -- not a very meaningful query -- but has two modifying CTEs along with another -- modify statement -- We need to force 1 connection per placement -- otherwise the coordinator insert select fails -- since COPY cannot be executed SET citus.force_max_query_parallelization TO on; WITH copy_to_other_table AS ( INSERT INTO distributed_table SELECT * FROM second_distributed_table WHERE dept = 3 ON CONFLICT (tenant_id) DO UPDATE SET dept = 4 RETURNING * ), main_table_deleted AS ( DELETE FROM distributed_table WHERE dept < 10 AND NOT EXISTS (SELECT 1 FROM second_distributed_table WHERE second_distributed_table.dept = 1 AND second_distributed_table.tenant_id = distributed_table.tenant_id) RETURNING * ) INSERT INTO second_distributed_table SELECT * FROM main_table_deleted EXCEPT SELECT * FROM copy_to_other_table; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: generating subplan XXX_1 for CTE copy_to_other_table: INSERT INTO with_dml.distributed_table (tenant_id, dept) SELECT tenant_id, dept FROM with_dml.second_distributed_table WHERE (dept OPERATOR(pg_catalog.=) 3) ON CONFLICT(tenant_id) DO UPDATE SET dept = 4 RETURNING distributed_table.tenant_id, distributed_table.dept DEBUG: generating subplan XXX_2 for CTE main_table_deleted: DELETE FROM with_dml.distributed_table WHERE ((dept OPERATOR(pg_catalog.<) 10) AND (NOT (EXISTS (SELECT 1 FROM with_dml.second_distributed_table WHERE ((second_distributed_table.dept OPERATOR(pg_catalog.=) 1) AND (second_distributed_table.tenant_id OPERATOR(pg_catalog.=) distributed_table.tenant_id)))))) RETURNING tenant_id, dept DEBUG: generating subplan XXX_3 for subquery SELECT main_table_deleted.tenant_id, main_table_deleted.dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) main_table_deleted EXCEPT SELECT copy_to_other_table.tenant_id, copy_to_other_table.dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) copy_to_other_table DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT tenant_id, dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator SET citus.force_max_query_parallelization TO off; -- CTE inside the UPDATE statement UPDATE second_distributed_table SET dept = (WITH vals AS ( SELECT DISTINCT tenant_id::int FROM distributed_table ) select * from vals where tenant_id = 8 ) WHERE dept = 8; DEBUG: CTE vals is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT (tenant_id)::integer AS tenant_id FROM with_dml.distributed_table DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE with_dml.second_distributed_table SET dept = (SELECT vals.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) vals WHERE (vals.tenant_id OPERATOR(pg_catalog.=) 8)) WHERE (dept OPERATOR(pg_catalog.=) 8) -- Subquery inside the UPDATE statement UPDATE second_distributed_table SET dept = (SELECT DISTINCT tenant_id::int FROM distributed_table WHERE tenant_id = '9') WHERE dept = 8; DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT (tenant_id)::integer AS tenant_id FROM with_dml.distributed_table WHERE (tenant_id OPERATOR(pg_catalog.=) '9'::text) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE with_dml.second_distributed_table SET dept = (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) WHERE (dept OPERATOR(pg_catalog.=) 8) -- delete all remaining tenants WITH ids_to_delete AS ( SELECT tenant_id FROM distributed_table ) DELETE FROM distributed_table WHERE tenant_id = ANY(SELECT tenant_id FROM ids_to_delete); DEBUG: generating subplan XXX_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM with_dml.distributed_table WHERE (tenant_id OPERATOR(pg_catalog.=) ANY (SELECT ids_to_delete.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete)) WITH ids_to_delete AS ( SELECT id FROM reference_table ) DELETE FROM reference_table WHERE id = ANY(SELECT id FROM ids_to_delete); RESET client_min_messages; DROP SCHEMA with_dml CASCADE; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to table distributed_table drop cascades to table second_distributed_table drop cascades to table reference_table