\echo roles must exist on both sides of the connection so they cannot be created in a transaction roles must exist on both sides of the connection so they cannot be created in a transaction \set pmpp_test_user 'pmpp_test_user' \set password 'dummy'; \set nopw_test_user 'nopw_test_user' create role :pmpp_test_user password :'password' login; create role :nopw_test_user password '' login; grant pmpp to :pmpp_test_user, :nopw_test_user; create schema dblink_test_schema; create extension dblink schema dblink_test_schema; create extension postgres_fdw; -- a user-mappings convenience, not a dependency create extension pmpp; select current_database() as dbname, 'dbname=' || current_database() as loopback_su_conn_str \gset \set pmpp_localhost_server 'localhost_server' select format('[{"connection": "%s", "query": "select 1"}]', :'loopback_su_conn_str') as json_str_1, format('[{"connection": "%s", "query": "select 999"}]', :'pmpp_localhost_server') as json_str_2, format('[{"connection": "%s", "query": "select count(pg_sleep(1))", "statement_timeout": 100}]', :'pmpp_localhost_server') as json_str_timeout, format('[{"connection": "%s", "num_workers": 1, "queries": ["select 1","select 2/0","select 3"]}]', :'pmpp_localhost_server') as json_str_div_zero \gset ---- -- test disconnect() select dblink_test_schema.dblink_connect('pmpp.999',:'loopback_su_conn_str'); dblink_connect ---------------- OK (1 row) select * from unnest(dblink_test_schema.dblink_get_connections()) as c; c ---------- pmpp.999 (1 row) select pmpp.disconnect(); disconnect ------------ OK (1 row) select * from unnest(dblink_test_schema.dblink_get_connections()) as c; c --- (0 rows) select * from pmpp.execute_command('analyze'); command | result ---------+-------- analyze | OK (1 row) create table parallel_index_test( b integer, c integer, d integer ); grant all on parallel_index_test to public; insert into parallel_index_test select b.b, c.c, d.d from generate_series(1,10) as b, generate_series(1,10) as c, generate_series(1,10) as d; select * from parallel_index_test order by 1,2,3 limit 10; b | c | d ---+---+---- 1 | 1 | 1 1 | 1 | 2 1 | 1 | 3 1 | 1 | 4 1 | 1 | 5 1 | 1 | 6 1 | 1 | 7 1 | 1 | 8 1 | 1 | 9 1 | 1 | 10 (10 rows) select b, sum(c), sum(d) from parallel_index_test group by b order by b; b | sum | sum ----+-----+----- 1 | 550 | 550 2 | 550 | 550 3 | 550 | 550 4 | 550 | 550 5 | 550 | 550 6 | 550 | 550 7 | 550 | 550 8 | 550 | 550 9 | 550 | 550 10 | 550 | 550 (10 rows) select b, sum(c), sum(d) from pmpp.distribute(null::parallel_index_test, 'dbname=' || current_database(), array( select format('select %s, sum(c), sum(d) from parallel_index_test where b = %s',b.b,b.b) from generate_series(1,10) as b )) group by b order by b; b | sum | sum ----+-----+----- 1 | 550 | 550 2 | 550 | 550 3 | 550 | 550 4 | 550 | 550 5 | 550 | 550 6 | 550 | 550 7 | 550 | 550 8 | 550 | 550 9 | 550 | 550 10 | 550 | 550 (10 rows) select dblink_test_schema.dblink_exec(:'loopback_su_conn_str','create index pit1 on parallel_index_test(c)'); dblink_exec -------------- CREATE INDEX (1 row) select dblink_test_schema.dblink_exec(:'loopback_su_conn_str','drop index pit1'); dblink_exec ------------- DROP INDEX (1 row) select * from pmpp.execute_command('create index pit1 on parallel_index_test(c)'); command | result ---------------------------------------------+-------- create index pit1 on parallel_index_test(c) | OK (1 row) select * from pmpp.execute_command('drop index pit1'); command | result -----------------+-------- drop index pit1 | OK (1 row) select * from pmpp.meta('dbname=' || current_database(), array( select format('create index on %s(%s)',c.table_name,c.column_name) from information_schema.columns c where c.table_name = 'parallel_index_test' and c.table_schema = 'public')) order by 1; command | result ----------------------------------------+-------- create index on parallel_index_test(b) | OK create index on parallel_index_test(c) | OK create index on parallel_index_test(d) | OK (3 rows) \d+ parallel_index_test Table "public.parallel_index_test" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+-----------+---------+--------------+------------- b | integer | | plain | | c | integer | | plain | | d | integer | | plain | | Indexes: "parallel_index_test_b_idx" btree (b) "parallel_index_test_c_idx" btree (c) "parallel_index_test_d_idx" btree (d) ---- -- test cast of array which tests cast of pmpp.query_manifest as well select t.* from unnest(:'json_str_1'::jsonb::pmpp.query_manifest[]) t; connection | queries | cpu_multiplier | num_workers | statement_timeout ---------------------------+--------------+----------------+-------------+------------------- dbname=contrib_regression | {"select 1"} | | | (1 row) create temporary table x(y integer); select * from pmpp.distribute(null::x,:'loopback_su_conn_str',array['select 1','select 2','select 3']); y --- 1 2 3 (3 rows) select * from pmpp.manifest_set(:'json_str_1'::jsonb); connection | queries | cpu_multiplier | num_workers | statement_timeout ---------------------------+--------------+----------------+-------------+------------------- dbname=contrib_regression | {"select 1"} | | | (1 row) select queries from pmpp.manifest_set(:'json_str_1'::jsonb); queries -------------- {"select 1"} (1 row) select * from pmpp.manifest_set(:'json_str_div_zero'::jsonb); connection | queries | cpu_multiplier | num_workers | statement_timeout ------------------+--------------------------------------+----------------+-------------+------------------- localhost_server | {"select 1","select 2/0","select 3"} | | 1 | (1 row) select * from pmpp.manifest_set(:'json_str_timeout'::jsonb); connection | queries | cpu_multiplier | num_workers | statement_timeout ------------------+-------------------------------+----------------+-------------+------------------- localhost_server | {"select count(pg_sleep(1))"} | | | 100 (1 row) create temporary table r as select array_agg(t) r from pmpp.manifest_set(:'json_str_1'::jsonb) as t; select * from pmpp.distribute(null::x, :'json_str_1'::jsonb); y --- 1 (1 row) create server :pmpp_localhost_server foreign data wrapper postgres_fdw options (host 'localhost', dbname :'dbname' ); create user mapping for public server :pmpp_localhost_server options(user :'pmpp_test_user', password :'password'); select * from pmpp.distribute(null::x,:'pmpp_localhost_server',array['select 1','select 2','select 3']); y --- 1 2 3 (3 rows) select * from pmpp.distribute(null::x, :'json_str_2'::jsonb); y ----- 999 (1 row) select * from pmpp.distribute(null::x, :'json_str_timeout'::jsonb); WARNING: connection pmpp.1/localhost_server query: select count(pg_sleep(1)) CONTEXT: SQL function "distribute" statement 1 ERROR: canceling statement due to statement timeout CONTEXT: Error occurred on dblink connection named "pmpp.1": could not execute query. PL/pgSQL function distribute(anyelement,query_manifest[]) line 145 at RETURN QUERY SQL function "distribute" statement 1 select * from unnest(dblink_test_schema.dblink_get_connections()) as c; c --- (0 rows) select pmpp.disconnect(); disconnect ------------ (0 rows) select * from unnest(dblink_test_schema.dblink_get_connections()) as c; c --- (0 rows) select * from pmpp.distribute(null::x, :'json_str_div_zero'::jsonb); WARNING: connection pmpp.1/localhost_server query: select 2/0 CONTEXT: SQL function "distribute" statement 1 ERROR: division by zero CONTEXT: Error occurred on dblink connection named "pmpp.1": could not execute query. PL/pgSQL function distribute(anyelement,query_manifest[]) line 170 at RETURN QUERY SQL function "distribute" statement 1 select * from unnest(dblink_test_schema.dblink_get_connections()) as c; c --- (0 rows) select pmpp.disconnect(); disconnect ------------ (0 rows) select * from unnest(dblink_test_schema.dblink_get_connections()) as c; c --- (0 rows) drop user mapping for public server :pmpp_localhost_server; create user mapping for public server :pmpp_localhost_server options(user :'nopw_test_user', password ''); select * from pmpp.distribute(null::x, :'json_str_2'::jsonb); WARNING: error connecting pmpp.1/localhost_server CONTEXT: SQL function "distribute" statement 1 ERROR: could not establish connection DETAIL: fe_sendauth: no password supplied CONTEXT: SQL statement "SELECT dblink_connect_u(r.dblink_connname,r.connection)" PL/pgSQL function distribute(anyelement,query_manifest[]) line 91 at PERFORM SQL function "distribute" statement 1 drop role :pmpp_test_user; drop role :nopw_test_user;