CREATE SCHEMA cursors; SET search_path TO cursors; CREATE TABLE distributed_table (key int, value text); SELECT create_distributed_table('distributed_table', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) -- load some data, but not very small amounts because RETURN QUERY in plpgsql -- hard codes the cursor fetch to 50 rows on PG 12, though they might increase -- it sometime in the future, so be mindful INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i; CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' BEGIN OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1; RETURN $1; END; ' LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' BEGIN OPEN $1 FOR WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0) SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; RETURN $1; END; ' LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS ' BEGIN OPEN $1 FOR WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0) SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; RETURN $1; END; ' LANGUAGE plpgsql; -- pretty basic query with cursors -- Citus should plan/execute once and pull -- the results to coordinator, then serve it -- from the coordinator BEGIN; SELECT simple_cursor_on_dist_table('cursor_1'); simple_cursor_on_dist_table --------------------------------------------------------------------- cursor_1 (1 row) SET LOCAL citus.log_intermediate_results TO ON; SET LOCAL client_min_messages TO DEBUG1; FETCH 5 IN cursor_1; key --------------------------------------------------------------------- 0 1 2 3 4 (5 rows) FETCH 50 IN cursor_1; key --------------------------------------------------------------------- 5 6 7 8 9 (5 rows) FETCH ALL IN cursor_1; key --------------------------------------------------------------------- (0 rows) COMMIT; BEGIN; SELECT cursor_with_intermediate_result_on_dist_table('cursor_1'); cursor_with_intermediate_result_on_dist_table --------------------------------------------------------------------- cursor_1 (1 row) -- multiple FETCH commands should not trigger re-running the subplans SET LOCAL citus.log_intermediate_results TO ON; SET LOCAL client_min_messages TO DEBUG1; FETCH 5 IN cursor_1; DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx key --------------------------------------------------------------------- 0 1 2 3 4 (5 rows) FETCH 1 IN cursor_1; key --------------------------------------------------------------------- 5 (1 row) FETCH ALL IN cursor_1; key --------------------------------------------------------------------- 6 7 8 9 (4 rows) FETCH 5 IN cursor_1; key --------------------------------------------------------------------- (0 rows) COMMIT; BEGIN; SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600'); cursor_with_intermediate_result_on_dist_table_with_param --------------------------------------------------------------------- cursor_1 (1 row) -- multiple FETCH commands should not trigger re-running the subplans -- also test with parameters SET LOCAL citus.log_intermediate_results TO ON; SET LOCAL client_min_messages TO DEBUG1; FETCH 1 IN cursor_1; DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx key --------------------------------------------------------------------- 0 (1 row) FETCH 1 IN cursor_1; key --------------------------------------------------------------------- 1 (1 row) FETCH 1 IN cursor_1; key --------------------------------------------------------------------- 2 (1 row) FETCH 1 IN cursor_1; key --------------------------------------------------------------------- 3 (1 row) FETCH 1 IN cursor_1; key --------------------------------------------------------------------- 4 (1 row) FETCH 1 IN cursor_1; key --------------------------------------------------------------------- 5 (1 row) FETCH ALL IN cursor_1; key --------------------------------------------------------------------- 6 7 8 9 (4 rows) COMMIT; CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$ BEGIN return query WITH cte AS (SELECT dt.value FROM distributed_table dt WHERE dt.value in (SELECT value FROM distributed_table p GROUP BY p.value HAVING count(*) > 0)) SELECT * FROM cte; END; $function$ ; SET citus.log_intermediate_results TO ON; SET client_min_messages TO DEBUG1; \set VERBOSITY terse SELECT count(*) from (SELECT value_counter()) as foo; DEBUG: CTE cte is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for subquery SELECT value FROM cursors.distributed_table p GROUP BY value HAVING (count(*) OPERATOR(pg_catalog.>) 0) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT value FROM (SELECT dt.value FROM cursors.distributed_table dt WHERE (dt.value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)))) cte DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 1001 (1 row) BEGIN; SELECT count(*) from (SELECT value_counter()) as foo; DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 1001 (1 row) COMMIT; -- suppress NOTICEs SET client_min_messages TO ERROR; DROP SCHEMA cursors CASCADE;