-- -- multi function in join queries aims to test the function calls that are -- used in joins. -- -- These functions are supposed to be executed on the worker and to ensure -- that we wrap those functions inside (SELECT * FROM fnc()) sub queries. -- -- We do not yet support those functions that: -- - have lateral joins -- - have WITH ORDINALITY clause -- - are user-defined and immutable CREATE SCHEMA functions_in_joins; SET search_path TO 'functions_in_joins'; SET citus.next_shard_id TO 2500000; SET citus.replication_model to 'streaming'; SET citus.shard_replication_factor to 1; CREATE TABLE table1 (id int, data int); SELECT create_distributed_table('table1','id'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO table1 SELECT x, x*x from generate_series(1, 100) as f (x); -- Verbose messages for observing the subqueries that wrapped function calls SET client_min_messages TO DEBUG1; -- Check joins on a sequence CREATE SEQUENCE numbers; SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC; DEBUG: generating subplan XXX_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n))) ORDER BY table1.id id | data | n --------------------------------------------------------------------- 1 | 1 | 1 (1 row) -- Check joins of a function that returns a single integer CREATE FUNCTION add(integer, integer) RETURNS integer AS 'SELECT $1 + $2;' LANGUAGE SQL; SELECT create_distributed_function('add(integer,integer)'); DEBUG: switching to sequential query execution mode DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- (1 row) SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC; DEBUG: generating subplan XXX_1 for subquery SELECT sum FROM functions_in_joins.add(3, 5) sum(sum) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, sum.sum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum integer)) sum ON ((table1.id OPERATOR(pg_catalog.=) sum.sum))) ORDER BY table1.id id | data | sum --------------------------------------------------------------------- 8 | 64 | 8 (1 row) -- Check join of plpgsql functions -- a function returning a single integer CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$ BEGIN RETURN i + 1; END; $$ LANGUAGE plpgsql; SELECT * FROM table1 JOIN increment(2) val ON (id = val) ORDER BY id ASC; DEBUG: generating subplan XXX_1 for subquery SELECT val FROM functions_in_joins.increment(2) val(val) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, val.val FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.val FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(val integer)) val ON ((table1.id OPERATOR(pg_catalog.=) val.val))) ORDER BY table1.id id | data | val --------------------------------------------------------------------- 3 | 9 | 3 (1 row) -- a function that returns a set of integers CREATE OR REPLACE FUNCTION next_k_integers(IN first_value INTEGER, IN k INTEGER DEFAULT 3, OUT result INTEGER) RETURNS SETOF INTEGER AS $$ BEGIN RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x); END; $$ LANGUAGE plpgsql; SELECT * FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result) ORDER BY id ASC; DEBUG: generating subplan XXX_1 for subquery SELECT result FROM functions_in_joins.next_k_integers(3, 2) next_integers(result) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_integers.result FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.result FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(result integer)) next_integers ON ((table1.id OPERATOR(pg_catalog.=) next_integers.result))) ORDER BY table1.id id | data | result --------------------------------------------------------------------- 3 | 9 | 3 4 | 16 | 4 (2 rows) -- a function returning set of records CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS $cmd$ SELECT x, x+1 FROM generate_series(0,4) f(x) $cmd$ LANGUAGE SQL; SELECT * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) ON (id = x) ORDER BY id ASC; DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM functions_in_joins.get_set_of_records() t2(x integer, y integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, t2.x, t2.y FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) t2 ON ((table1.id OPERATOR(pg_catalog.=) t2.x))) ORDER BY table1.id id | data | x | y --------------------------------------------------------------------- 1 | 1 | 1 | 2 2 | 4 | 2 | 3 3 | 9 | 3 | 4 4 | 16 | 4 | 5 (4 rows) -- a function returning table CREATE FUNCTION dup(int) RETURNS TABLE(f1 int, f2 text) AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; SELECT f.* FROM table1 t JOIN dup(32) f ON (f1 = id); DEBUG: generating subplan XXX_1 for subquery SELECT f1, f2 FROM functions_in_joins.dup(32) f(f1, f2) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.f1, f.f2 FROM (functions_in_joins.table1 t JOIN (SELECT intermediate_result.f1, intermediate_result.f2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(f1 integer, f2 text)) f ON ((f.f1 OPERATOR(pg_catalog.=) t.id))) f1 | f2 --------------------------------------------------------------------- 32 | 32 is text (1 row) -- a stable function CREATE OR REPLACE FUNCTION the_minimum_id() RETURNS INTEGER STABLE AS 'SELECT min(id) FROM table1' LANGUAGE SQL; SELECT * FROM table1 JOIN the_minimum_id() min_id ON (id = min_id); DEBUG: generating subplan XXX_1 for subquery SELECT min_id FROM functions_in_joins.the_minimum_id() min_id(min_id) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, min_id.min_id FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.min_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min_id integer)) min_id ON ((table1.id OPERATOR(pg_catalog.=) min_id.min_id))) id | data | min_id --------------------------------------------------------------------- 1 | 1 | 1 (1 row) -- a built-in immutable function SELECT * FROM table1 JOIN abs(100) as hundred ON (id = hundred) ORDER BY id ASC; id | data | hundred --------------------------------------------------------------------- 100 | 10000 | 100 (1 row) -- function joins inside a CTE WITH next_row_to_process AS ( SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ) SELECT * FROM table1, next_row_to_process WHERE table1.data <= next_row_to_process.data ORDER BY 1,2 ASC; DEBUG: generating subplan XXX_1 for CTE next_row_to_process: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN nextval('functions_in_joins.numbers'::regclass) n(n) ON ((table1.id OPERATOR(pg_catalog.=) n.n))) DEBUG: generating subplan XXX_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n))) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_row_to_process.id, next_row_to_process.data, next_row_to_process.n FROM functions_in_joins.table1, (SELECT intermediate_result.id, intermediate_result.data, intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, data integer, n bigint)) next_row_to_process WHERE (table1.data OPERATOR(pg_catalog.<=) next_row_to_process.data) ORDER BY table1.id, table1.data id | data | id | data | n --------------------------------------------------------------------- 1 | 1 | 2 | 4 | 2 2 | 4 | 2 | 4 | 2 (2 rows) -- Multiple functions in an RTE SELECT * FROM ROWS FROM (next_k_integers(5), next_k_integers(10)) AS f(a, b), table1 WHERE id = a ORDER BY id ASC; DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM ROWS FROM(functions_in_joins.next_k_integers(5), functions_in_joins.next_k_integers(10)) f(a, b) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.a, f.b, table1.id, table1.data FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) f(a, b), functions_in_joins.table1 WHERE (table1.id OPERATOR(pg_catalog.=) f.a) ORDER BY table1.id a | b | id | data --------------------------------------------------------------------- 5 | 10 | 5 | 25 6 | 11 | 6 | 36 7 | 12 | 7 | 49 (3 rows) -- Custom Type returning function used in a join RESET client_min_messages; CREATE TYPE min_and_max AS ( minimum INT, maximum INT ); SET client_min_messages TO DEBUG1; CREATE OR REPLACE FUNCTION max_and_min () RETURNS min_and_max AS $$ DECLARE result min_and_max%rowtype; begin select into result min(data) as minimum, max(data) as maximum from table1; return result; end; $$ language plpgsql; SELECT * FROM table1 JOIN max_and_min() m ON (m.maximum = data OR m.minimum = data) ORDER BY 1,2,3,4; DEBUG: generating subplan XXX_1 for subquery SELECT minimum, maximum FROM functions_in_joins.max_and_min() m(minimum, maximum) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, m.minimum, m.maximum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.minimum, intermediate_result.maximum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(minimum integer, maximum integer)) m ON (((m.maximum OPERATOR(pg_catalog.=) table1.data) OR (m.minimum OPERATOR(pg_catalog.=) table1.data)))) ORDER BY table1.id, table1.data, m.minimum, m.maximum id | data | minimum | maximum --------------------------------------------------------------------- 1 | 1 | 1 | 10000 100 | 10000 | 1 | 10000 (2 rows) -- The following tests will fail as we do not support all joins on -- all kinds of functions -- In other words, we cannot recursively plan the functions and hence -- the query fails on the workers SET client_min_messages TO ERROR; \set VERBOSITY terse -- function joins in CTE results can create lateral joins that are not supported -- we execute the query within a function to consolidate the error messages -- between different executors CREATE FUNCTION raise_failed_execution_func_join(query text) RETURNS void AS $$ BEGIN EXECUTE query; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'failed to execute task%' THEN RAISE 'Task failed to execute'; ELSIF SQLERRM LIKE '%does not exist%' THEN RAISE 'Task failed to execute'; END IF; END; $$LANGUAGE plpgsql; SELECT raise_failed_execution_func_join($$ WITH one_row AS ( SELECT * FROM table1 WHERE id=52 ) SELECT table1.id, table1.data FROM one_row, table1, next_k_integers(one_row.id, 5) next_five_ids WHERE table1.id = next_five_ids; $$); ERROR: Task failed to execute -- a user-defined immutable function CREATE OR REPLACE FUNCTION the_answer_to_life() RETURNS INTEGER IMMUTABLE AS 'SELECT 42' LANGUAGE SQL; SELECT raise_failed_execution_func_join($$ SELECT * FROM table1 JOIN the_answer_to_life() the_answer ON (id = the_answer); $$); ERROR: Task failed to execute SELECT raise_failed_execution_func_join($$ SELECT * FROM table1 JOIN next_k_integers(10,5) WITH ORDINALITY next_integers ON (id = next_integers.result); $$); ERROR: Task failed to execute -- WITH ORDINALITY clause SELECT raise_failed_execution_func_join($$ SELECT * FROM table1 JOIN next_k_integers(10,5) WITH ORDINALITY next_integers ON (id = next_integers.result) ORDER BY id ASC; $$); ERROR: Task failed to execute RESET client_min_messages; DROP SCHEMA functions_in_joins CASCADE; NOTICE: drop cascades to 12 other objects SET search_path TO DEFAULT;