-- -- MULTI_TASK_ASSIGNMENT -- SET citus.next_shard_id TO 880000; -- the function simply parses the results and returns 'shardId@worker' -- for all the explain task outputs CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text) RETURNS SETOF TEXT AS $$ DECLARE portOfTheTask text; shardOfTheTask text; begin for r in execute qry loop IF r LIKE '%port%' THEN portOfTheTask = substring(r, '([0-9]{1,10})'); END IF; IF r LIKE '%' || table_name || '%' THEN shardOfTheTask = substring(r, '([0-9]{5,10})'); END IF; end loop; return QUERY SELECT shardOfTheTask || '@' || portOfTheTask; end; $$ language plpgsql; SET citus.explain_distributed_queries TO off; -- Check that our policies for assigning tasks to worker nodes run as expected. -- To test this, we first create a shell table, and then manually insert shard -- and shard placement data into system catalogs. We next run Explain command, -- and check that tasks are assigned to worker nodes as expected. CREATE TABLE task_assignment_test_table (test_id integer); SELECT create_distributed_table('task_assignment_test_table', 'test_id', 'append'); -- Create logical shards with shardids 200, 201, and 202 INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) SELECT pg_class.oid, series.index, 'r', 1, 1000 FROM pg_class, generate_series(200, 202) AS series(index) WHERE pg_class.relname = 'task_assignment_test_table'; -- Create shard placements for shard 200 and 201 INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport) SELECT 200, 1, 1, nodename, nodeport FROM pg_dist_shard_placement GROUP BY nodename, nodeport ORDER BY nodename, nodeport ASC LIMIT 2; INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport) SELECT 201, 1, 1, nodename, nodeport FROM pg_dist_shard_placement GROUP BY nodename, nodeport ORDER BY nodename, nodeport ASC LIMIT 2; -- Create shard placements for shard 202 INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport) SELECT 202, 1, 1, nodename, nodeport FROM pg_dist_shard_placement GROUP BY nodename, nodeport ORDER BY nodename, nodeport DESC LIMIT 2; -- Start transaction block to avoid auto commits. This avoids additional debug -- messages from getting printed at real transaction starts and commits. BEGIN; -- Increase log level to see which worker nodes tasks are assigned to. Note that -- the following log messages print node name and port numbers; and node numbers -- in regression tests depend upon PG_VERSION_NUM. SET client_min_messages TO DEBUG3; -- First test the default greedy task assignment policy SET citus.task_assignment_policy TO 'greedy'; EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; -- Next test the first-replica task assignment policy SET citus.task_assignment_policy TO 'first-replica'; EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; COMMIT; CREATE TABLE task_assignment_reference_table (test_id integer); SELECT create_reference_table('task_assignment_reference_table'); BEGIN; SET LOCAL client_min_messages TO DEBUG3; SET LOCAL citus.explain_distributed_queries TO off; -- Check how task_assignment_policy impact planning decisions for reference tables SET LOCAL citus.task_assignment_policy TO 'greedy'; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; SET LOCAL citus.task_assignment_policy TO 'first-replica'; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; ROLLBACK; RESET client_min_messages; -- Now, lets test round-robin policy -- round-robin policy relies on PostgreSQL's local transactionId, -- which might change and we don't have any control over it. -- the important thing that we look for is that round-robin policy -- should give the same output for executions in the same transaction -- and different output for executions that are not inside the -- same transaction. To ensure that, we define a helper function BEGIN; SET LOCAL citus.explain_distributed_queries TO on; CREATE TEMPORARY TABLE explain_outputs (value text); SET LOCAL citus.task_assignment_policy TO 'round-robin'; INSERT INTO explain_outputs SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); INSERT INTO explain_outputs SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); -- given that we're in the same transaction, the count should be 1 SELECT count(DISTINCT value) FROM explain_outputs; TRUNCATE explain_outputs; COMMIT; -- now test round-robin policy outside -- a transaction, we should see the assignments -- change on every execution SET citus.task_assignment_policy TO 'round-robin'; SET citus.explain_distributed_queries TO ON; INSERT INTO explain_outputs SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); INSERT INTO explain_outputs SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); -- given that we're in the same transaction, the count should be 2 -- since there are two different worker nodes SELECT count(DISTINCT value) FROM explain_outputs; TRUNCATE explain_outputs; -- same test with a distributed table -- we keep this test because as of this commit, the code -- paths for reference tables and distributed tables are -- not the same SET citus.shard_replication_factor TO 2; CREATE TABLE task_assignment_replicated_hash (test_id integer); SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id'); BEGIN; SET LOCAL citus.explain_distributed_queries TO on; SET LOCAL citus.task_assignment_policy TO 'round-robin'; INSERT INTO explain_outputs SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); INSERT INTO explain_outputs SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); -- given that we're in the same transaction, the count should be 1 SELECT count(DISTINCT value) FROM explain_outputs; TRUNCATE explain_outputs; COMMIT; -- now test round-robin policy outside -- a transaction, we should see the assignments -- change on every execution SET citus.task_assignment_policy TO 'round-robin'; SET citus.explain_distributed_queries TO ON; INSERT INTO explain_outputs SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); INSERT INTO explain_outputs SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); -- given that we're in the same transaction, the count should be 2 -- since there are two different worker nodes SELECT count(DISTINCT value) FROM explain_outputs; TRUNCATE explain_outputs; -- test that the round robin policy detects the anchor shard correctly -- we should not pick a reference table shard as the anchor shard when joining with a distributed table SET citus.shard_replication_factor TO 1; CREATE TABLE task_assignment_nonreplicated_hash (test_id integer, ref_id integer); SELECT create_distributed_table('task_assignment_nonreplicated_hash', 'test_id'); -- run the query two times to make sure that it hits the correct worker every time INSERT INTO explain_outputs SELECT parse_explain_output($cmd$ EXPLAIN SELECT * FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist LEFT JOIN task_assignment_reference_table ref ON dist.ref_id = ref.test_id $cmd$, 'task_assignment_nonreplicated_hash'); INSERT INTO explain_outputs SELECT parse_explain_output($cmd$ EXPLAIN SELECT * FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist LEFT JOIN task_assignment_reference_table ref ON dist.ref_id = ref.test_id $cmd$, 'task_assignment_nonreplicated_hash'); -- The count should be 1 since the shard exists in only one worker node SELECT count(DISTINCT value) FROM explain_outputs; TRUNCATE explain_outputs; -- We should be able to use round-robin with router queries that -- only contains intermediate results CREATE TABLE task_assignment_test_table_2 (test_id integer); SELECT create_distributed_table('task_assignment_test_table_2', 'test_id'); SET citus.task_assignment_policy TO 'round-robin'; -- Run the query two times to make sure that it hits two different workers -- on consecutive runs INSERT INTO explain_outputs SELECT parse_explain_output($cmd$ EXPLAIN WITH q1 AS MATERIALIZED (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 $cmd$, 'task_assignment_test_table_2'); INSERT INTO explain_outputs SELECT parse_explain_output($cmd$ EXPLAIN WITH q1 AS MATERIALIZED (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 $cmd$, 'task_assignment_test_table_2'); -- The count should be 2 since the intermediate results are processed on -- different workers SELECT count(DISTINCT value) FROM explain_outputs; RESET citus.task_assignment_policy; RESET client_min_messages; DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash, task_assignment_reference_table, task_assignment_test_table_2, explain_outputs;