-- -- roles must exist on both sides of the connection so they cannot be created in a transaction -- \set pmpp_test_user 'pmpp_test_user' \set password 'dummy'; \set nopw_test_user 'nopw_test_user' create role :pmpp_test_user password :'password' login; create role :nopw_test_user password '' login; grant pmpp to :pmpp_test_user, :nopw_test_user; create extension postgres_fdw; -- a user-mappings convenience, not a dependency create extension pmpp; select sign(pmpp.num_cpus()); sign ------ 1 (1 row) select current_database() as dbname, 'dbname=' || current_database() as loopback_su_conn_str \gset \set pmpp_localhost_server 'localhost_server' select format('[{"connection": "%s", "query": "select 1"}]', :'loopback_su_conn_str') as json_str_1, format('[{"connection": "%s", "query": "select 999"}]', :'pmpp_localhost_server') as json_str_2, format('[{"connection": "%s", "query": "select count(pg_sleep(1))", "statement_timeout": 100}]', :'pmpp_localhost_server') as json_str_timeout, format('[{"connection": "%s", "num_workers": 1, "queries": ["select 1","select 2/0","select 3"]}]', :'pmpp_localhost_server') as json_str_div_zero \gset select pmpp.to_query_manifest('{"connection": "foo"}'::jsonb); to_query_manifest ------------------- (foo,{},,,) (1 row) create table parallel_index_test( b integer, c integer, d integer ); grant all on parallel_index_test to public; insert into parallel_index_test select b.b, c.c, d.d from generate_series(1,10) as b, generate_series(1,10) as c, generate_series(1,10) as d; select * from parallel_index_test order by 1,2,3 limit 10; b | c | d ---+---+---- 1 | 1 | 1 1 | 1 | 2 1 | 1 | 3 1 | 1 | 4 1 | 1 | 5 1 | 1 | 6 1 | 1 | 7 1 | 1 | 8 1 | 1 | 9 1 | 1 | 10 (10 rows) select b, sum(c), sum(d) from parallel_index_test group by b order by b; b | sum | sum ----+-----+----- 1 | 550 | 550 2 | 550 | 550 3 | 550 | 550 4 | 550 | 550 5 | 550 | 550 6 | 550 | 550 7 | 550 | 550 8 | 550 | 550 9 | 550 | 550 10 | 550 | 550 (10 rows) -- -- test cast of array which tests cast of pmpp.query_manifest as well -- select t.* from unnest(:'json_str_1'::jsonb::pmpp.query_manifest[]) t; connection | queries | cpu_multiplier | num_workers | setup_commands ---------------------------+--------------+----------------+-------------+---------------- dbname=contrib_regression | {"select 1"} | | | (1 row) create temporary table x(y integer); select * from pmpp.distribute(null::x,:'loopback_su_conn_str',array['select 1','select 2','select 3']) order by y; y --- 1 2 3 (3 rows) select * from pmpp.manifest_set(:'json_str_1'::jsonb); connection | queries | cpu_multiplier | num_workers | setup_commands ---------------------------+--------------+----------------+-------------+---------------- dbname=contrib_regression | {"select 1"} | | | (1 row) select queries from pmpp.manifest_set(:'json_str_1'::jsonb); queries -------------- {"select 1"} (1 row) select * from pmpp.manifest_set(:'json_str_div_zero'::jsonb); connection | queries | cpu_multiplier | num_workers | setup_commands ------------------+--------------------------------------+----------------+-------------+---------------- localhost_server | {"select 1","select 2/0","select 3"} | | 1 | (1 row) select * from pmpp.manifest_set(:'json_str_timeout'::jsonb); connection | queries | cpu_multiplier | num_workers | setup_commands ------------------+-------------------------------+----------------+-------------+--------------------------------- localhost_server | {"select count(pg_sleep(1))"} | | | {"set statement_timeout = 100"} (1 row) create temporary table r as select array_agg(t) r from pmpp.manifest_set(:'json_str_1'::jsonb) as t; select * from pmpp.distribute(null::x, :'json_str_1'::jsonb); y --- 1 (1 row) -- -- test meta() which is just distribute() with a specific result set -- select * from pmpp.meta('dbname=' || current_database(), array( select format('create index on %s(%s)',c.table_name,c.column_name) from information_schema.columns c where c.table_name = 'parallel_index_test' and c.table_schema = 'public'), setup_commands := array ['set application_name = indexer','set client_encoding = UTF8']) order by 1; command | result ----------------------------------------+-------------- create index on parallel_index_test(b) | CREATE INDEX create index on parallel_index_test(c) | CREATE INDEX create index on parallel_index_test(d) | CREATE INDEX (3 rows) \d+ parallel_index_test Table "public.parallel_index_test" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+-----------+---------+--------------+------------- b | integer | | plain | | c | integer | | plain | | d | integer | | plain | | Indexes: "parallel_index_test_b_idx" btree (b) "parallel_index_test_c_idx" btree (c) "parallel_index_test_d_idx" btree (d) select b, sum(c), sum(d) from pmpp.distribute(null::parallel_index_test, 'dbname=' || current_database(), array( select format('select %s, sum(c), sum(d) from parallel_index_test where b = %s',b.b,b.b) from generate_series(1,10) as b )) group by b order by b; b | sum | sum ----+-----+----- 1 | 550 | 550 2 | 550 | 550 3 | 550 | 550 4 | 550 | 550 5 | 550 | 550 6 | 550 | 550 7 | 550 | 550 8 | 550 | 550 9 | 550 | 550 10 | 550 | 550 (10 rows) create server :pmpp_localhost_server foreign data wrapper postgres_fdw options (host 'localhost', dbname :'dbname' ); create user mapping for public server :pmpp_localhost_server options(user :'pmpp_test_user', password :'password'); -- -- test normal legit queryset -- select * from pmpp.distribute(null::x,:'pmpp_localhost_server',array['select 1','select 2','select 3']); y --- 1 2 3 (3 rows) -- -- test result set with too many columns -- select * from pmpp.distribute(null::x,:'pmpp_localhost_server',array['select 1','select 2','select 3, 4']); ERROR: result rowtype does not match expected rowtype connection: localhost_server query: select 3, 4 CONTEXT: SQL function "distribute" statement 1 -- -- test result set with right number of columns but wrong type -- select * from pmpp.distribute(null::x,:'pmpp_localhost_server',array['select 1','select ''2016-01-01''::date','select 3']); ERROR: invalid input syntax for integer: "2016-01-01" CONTEXT: query: select '2016-01-01'::date on connection to: localhost_server SQL function "distribute" statement 1 -- -- test legit json input -- select * from pmpp.distribute(null::x, :'json_str_2'::jsonb); y ----- 999 (1 row) -- -- test a query that times out -- select * from pmpp.distribute(null::x, :'json_str_timeout'::jsonb); ERROR: canceling statement due to statement timeout CONTEXT: Error occurred on a connection to: localhost_server executing query: select count(pg_sleep(1)). SQL function "distribute" statement 1 select * from pmpp.broadcast(null::x, array[ :'loopback_su_conn_str' , :'pmpp_localhost_server' ], 'select 1'); y --- 1 1 (2 rows) select * from pmpp.distribute(null::x, :'json_str_div_zero'::jsonb); ERROR: division by zero CONTEXT: Error occurred on a connection to: localhost_server executing query: select 2/0. SQL function "distribute" statement 1 drop user mapping for public server :pmpp_localhost_server; create user mapping for public server :pmpp_localhost_server options(user :'nopw_test_user', password ''); select * from pmpp.distribute(null::x, :'json_str_2'::jsonb); ERROR: could not establish connection DETAIL: fe_sendauth: no password supplied CONTEXT: SQL function "distribute" statement 1 select * from pmpp.distribute(null::x, '[{"connection": "bad_connstr_1", "query": "select 1"},' '{"connection": "bad_connstr2", "query": "select 2", ' ' "setup_commands": ["set application_name = test1", "set client_encoding = UTF8" ] }]'::jsonb); ERROR: could not establish connection DETAIL: missing "=" after "bad_connstr_1" in connection info string CONTEXT: SQL function "distribute" statement 1 drop role :pmpp_test_user; drop role :nopw_test_user;