CREATE SCHEMA with_join; SET search_path TO with_join, public; SET citus.next_shard_id TO 1501000; CREATE TABLE with_join.reference_table(user_id int); SELECT create_reference_table('with_join.reference_table'); create_reference_table --------------------------------------------------------------------- (1 row) INSERT INTO reference_table VALUES (6), (7); SET citus.enable_repartition_joins TO on; -- Two colocated CTE under a non-colocated join WITH colocated_1 AS ( SELECT users_table.user_id, events_table.value_2 FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1, 2, 3) ), colocated_2 AS ( SELECT users_table.user_id, events_table.value_2 FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (4, 5, 6) ) SELECT colocated_1.user_id, count(*) FROM colocated_1, colocated_2 WHERE colocated_1.value_2 = colocated_2.value_2 GROUP BY 1 ORDER BY 2 DESC, 1; user_id | count --------------------------------------------------------------------- 3 | 30168 4 | 27768 2 | 25327 5 | 25083 1 | 6776 6 | 6710 (6 rows) -- Two non-colocated CTE under a co-located join WITH non_colocated_1 AS ( SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (1, 2, 3) ), non_colocated_2 AS ( SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (4, 5, 6) ) SELECT non_colocated_1.user_id, count(*) FROM non_colocated_1, non_colocated_2 WHERE non_colocated_1.user_id = non_colocated_2.user_id GROUP BY 1 ORDER BY 2 DESC, 1; user_id | count --------------------------------------------------------------------- 2 | 67507 4 | 23040 3 | 14580 5 | 10935 1 | 6272 (5 rows) -- Subqueries in WHERE and FROM are mixed -- In this query, only subquery in WHERE is not a colocated join -- but we're able to recursively plan that as well WITH users_events AS ( WITH colocated_join AS ( SELECT users_table.user_id as uid, event_type FROM users_table join events_table on (users_table.user_id = events_table.user_id) WHERE events_table.event_type IN (1, 2, 3) ), colocated_join_2 AS ( SELECT users_table.user_id, event_type FROM users_table join events_table on (users_table.user_id = events_table.user_id) WHERE events_table.event_type IN (4, 5, 6) ) SELECT uid, colocated_join.event_type FROM colocated_join, colocated_join_2 WHERE colocated_join.uid = colocated_join_2.user_id AND colocated_join.event_type IN ( WITH some_events AS ( SELECT event_type FROM events_table WHERE user_id < 100 GROUP BY 1 ORDER BY 1 LIMIT 10 ) SELECT * FROM some_events ) ) SELECT DISTINCT uid FROM users_events ORDER BY 1 DESC LIMIT 5; uid --------------------------------------------------------------------- 6 5 4 3 2 (5 rows) -- cte LEFT JOIN distributed_table should error out -- as long as the CTE is recursively planned -- prevent PG 11 - PG 12 outputs to diverge SET citus.enable_cte_inlining TO false; WITH cte AS ( SELECT * FROM users_table WHERE user_id = 1 ORDER BY value_1 ) SELECT cte.user_id, cte.time, events_table.event_type FROM cte LEFT JOIN events_table ON cte.user_id = events_table.user_id ORDER BY 1,2,3 LIMIT 5; ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join SET citus.enable_cte_inlining TO true; -- cte RIGHT JOIN distributed_table should work WITH cte AS ( SELECT * FROM users_table WHERE user_id = 1 ORDER BY value_1 ) SELECT cte.user_id, cte.time, events_table.event_type FROM cte RIGHT JOIN events_table ON cte.user_id = events_table.user_id ORDER BY 1,2,3 LIMIT 5; user_id | time | event_type --------------------------------------------------------------------- 1 | Wed Nov 22 22:51:43.132261 2017 | 0 1 | Wed Nov 22 22:51:43.132261 2017 | 0 1 | Wed Nov 22 22:51:43.132261 2017 | 1 1 | Wed Nov 22 22:51:43.132261 2017 | 1 1 | Wed Nov 22 22:51:43.132261 2017 | 2 (5 rows) -- distributed_table LEFT JOIN cte should work WITH cte AS ( SELECT * FROM users_table WHERE value_1 = 1 ORDER BY value_1 ) SELECT cte.user_id, cte.time, events_table.event_type FROM events_table LEFT JOIN cte ON cte.user_id = events_table.user_id ORDER BY 1,2,3 LIMIT 5; user_id | time | event_type --------------------------------------------------------------------- 1 | Thu Nov 23 09:26:42.145043 2017 | 0 1 | Thu Nov 23 09:26:42.145043 2017 | 0 1 | Thu Nov 23 09:26:42.145043 2017 | 1 1 | Thu Nov 23 09:26:42.145043 2017 | 1 1 | Thu Nov 23 09:26:42.145043 2017 | 2 (5 rows) -- distributed_table RIGHT JOIN cte should error out -- prevent PG 11 - PG 12 outputs to diverge SET citus.enable_cte_inlining TO false; WITH cte AS ( SELECT * FROM users_table WHERE value_1 = 1 ORDER BY value_1 ) SELECT cte.user_id, cte.time, events_table.event_type FROM events_table RIGHT JOIN cte ON cte.user_id = events_table.user_id ORDER BY 1,2,3 LIMIT 5; ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join -- cte FULL JOIN distributed_table should error out WITH cte AS ( SELECT * FROM users_table WHERE user_id = 1 ORDER BY value_1 ) SELECT cte.user_id, cte.time, events_table.event_type FROM events_table FULL JOIN cte ON cte.user_id = events_table.user_id ORDER BY 1,2,3 LIMIT 5; ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join SET citus.enable_cte_inlining TO false; -- Joins with reference tables are planned as router queries WITH cte AS ( SELECT value_2, max(user_id) AS user_id FROM users_table WHERE value_2 = 1 GROUP BY value_2 HAVING count(*) > 1 ) SELECT row_number() OVER(), cte.user_id FROM cte FULL JOIN reference_table ON cte.user_id + 1 = reference_table.user_id ORDER BY user_id LIMIT 5; row_number | user_id --------------------------------------------------------------------- 2 | 6 1 | (2 rows) -- some more tests for more complex outer-joins -- with reference tables CREATE TABLE distributed_1 (col1 int, col2 int, distrib_col int); CREATE TABLE distributed_2 (col1 int, col2 int, distrib_col int); CREATE TABLE reference_1 (col1 int, col2 int); CREATE TABLE reference_2(col1 int, col2 int); SELECT create_distributed_table('distributed_1','distrib_col'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('distributed_2','distrib_col'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('reference_1'); create_reference_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('reference_2'); create_reference_table --------------------------------------------------------------------- (1 row) INSERT INTO distributed_1 SELECT i, i, i FROM generate_series(0,100) i; INSERT INTO distributed_2 SELECT i%2, i%2, i%2 FROM generate_series(0,100) i; INSERT INTO reference_1 SELECT i%3, i%3 FROM generate_series(0,100) i; INSERT INTO reference_2 SELECT i%4, i%4 FROM generate_series(0,100) i; select count(*) from distributed_1 AS d1 LEFT JOIN reference_1 AS r1 ON d1.col2=r1.col2 LEFT JOIN reference_2 AS r2 ON r2.col1 = r1.col1 join (select distrib_col,count(*) from distributed_2 group by distrib_col) d2 ON d2.distrib_col=d1.distrib_col; count --------------------------------------------------------------------- 1734 (1 row) with d2 AS (select distrib_col,count(*) from distributed_2 group by distrib_col) select count(*) from distributed_1 AS d1 LEFT JOIN reference_1 AS r1 ON d1.col2=r1.col2 LEFT JOIN reference_2 AS r2 ON r2.col1 = r1.col1 join d2 ON d2.distrib_col=d1.distrib_col; count --------------------------------------------------------------------- 1734 (1 row) with d2 AS (select distrib_col,col1 from distributed_2) select count(*) from distributed_1 AS d1 LEFT JOIN reference_1 AS r1 ON d1.col2=r1.col2 LEFT JOIN reference_2 AS r2 ON r2.col1 = r1.col1 join d2 ON d2.distrib_col=d1.distrib_col; count --------------------------------------------------------------------- 87584 (1 row) with cte_1 AS (select col1 from reference_1) select count(*) from distributed_1 AS d1 LEFT JOIN reference_1 AS r1 ON d1.col2=r1.col2 LEFT JOIN reference_2 AS r2 ON r2.col1 = r1.col1 join cte_1 ON cte_1.col1=d1.distrib_col; count --------------------------------------------------------------------- 86181 (1 row) RESET client_min_messages; DROP SCHEMA with_join CASCADE; NOTICE: drop cascades to 5 other objects DETAIL: drop cascades to table reference_table drop cascades to table distributed_1 drop cascades to table distributed_2 drop cascades to table reference_1 drop cascades to table reference_2