\set ECHO none \set VERBOSITY terse --Wait for 'pipe_test_owner' created notification to be sent by session A SELECT dbms_pipe.receive_message('pipe_test_owner_created_notifier'); -- create new connection under the userid of 'pipe_test_owner' SET SESSION AUTHORIZATION pipe_test_owner; /* Tests receive_message(text,integer), next_item_type() and all versions of * unpack_message_() and purge(text) */ CREATE OR REPLACE FUNCTION receiveFrom(pipename text) RETURNS void AS $$ DECLARE typ INTEGER; BEGIN WHILE true LOOP PERFORM dbms_pipe.receive_message(pipename,2); SELECT dbms_pipe.next_item_type() INTO typ; IF typ = 0 THEN EXIT; ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number(); ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text(); ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date(); ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp(); ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea(),'escape'); ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record(); END IF; END LOOP; PERFORM dbms_pipe.purge(pipename); END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION bulkReceive() RETURNS void AS $$ DECLARE typ INTEGER; BEGIN IF dbms_pipe.receive_message('named_pipe_2',2) = 1 THEN RAISE NOTICE 'Timeout'; PERFORM pg_sleep(2); PERFORM dbms_pipe.receive_message('named_pipe_2',2); END IF; WHILE true LOOP SELECT dbms_pipe.next_item_type() INTO typ; IF typ = 0 THEN EXIT; ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number(); ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text(); ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date(); ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp(); ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea()::bytea,'escape'); ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record(); END IF; END LOOP; PERFORM dbms_pipe.purge('named_pipe_2'); END; $$ LANGUAGE plpgsql; -- Tests receive_message(text) CREATE OR REPLACE FUNCTION checkReceive1(pipename text) RETURNS void AS $$ BEGIN PERFORM dbms_pipe.receive_message(pipename); RAISE NOTICE 'RECEIVE %',dbms_pipe.unpack_message_text(); END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION dropTempTable() RETURNS void AS $$ BEGIN WHILE dbms_pipe.receive_message('pipe_name_3') <> 0 LOOP CONTINUE; END LOOP; DROP TABLE TEMP; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION checkUniqueSessionNameB() RETURNS bool AS $$ DECLARE result bool; BEGIN PERFORM dbms_pipe.receive_message('pipe_name_4'); SELECT dbms_pipe.unpack_message_text() = dbms_pipe.unique_session_name() INTO result; RETURN result; END; $$ LANGUAGE plpgsql; \set ECHO all -- Receives messages sent via an implicit pipe SELECT receiveFrom('named_pipe'); -- Bulk receive messages SELECT bulkReceive(); -- Receives messages sent via an explicit private pipe under the same user -- 'pipe_test_owner' SELECT dbms_pipe.receive_message('recv_private1_notifier'); SELECT receiveFrom('private_pipe_1'); -- Switch user to 'pipe_test_other' DROP USER IF EXISTS pipe_test_other; CREATE USER pipe_test_other; SET SESSION AUTHORIZATION pipe_test_other; -- Try to receive messages sent via an explicit private pipe under the user -- 'pipe_test_other' who is not the owner of pipe. -- insufficient privileges in case of 'private_pipe_2'. SELECT dbms_pipe.receive_message('recv_private2_notifier'); SELECT receiveFrom('private_pipe_2'); -- These are explicit private pipes created using create_pipe(text,integer) -- and create_pipe(text) SELECT dbms_pipe.receive_message('recv_public1_notifier'); SELECT receiveFrom('public_pipe_3'); SELECT dbms_pipe.receive_message('recv_public2_notifier'); SELECT receiveFrom('public_pipe_4'); -- Switch back to user 'pipe_test_owner' SET SESSION AUTHORIZATION pipe_test_owner; DROP USER pipe_test_other; -- Tests receive_message(text) SELECT checkReceive1('pipe_name_1'); SELECT checkReceive1('pipe_name_2'); -- Tests dbms_pipe.db_pipes view SELECT name, items, "limit", private, owner FROM dbms_pipe.db_pipes WHERE name LIKE 'private%' ORDER BY name; -- Tests dbms_pipe.__list_pipes(); attribute size is not included -- since it can be different across runs. SELECT name, items, "limit", private, owner FROM dbms_pipe.__list_pipes() AS (name varchar, items int4, siz int4, "limit" int4, private bool, owner varchar) WHERE name <> 'pipe_name_4' ORDER BY 1; -- Tests remove_pipe(text) SELECT dbms_pipe.remove_pipe('private_pipe_1'); SELECT dbms_pipe.remove_pipe('private_pipe_2'); SELECT dbms_pipe.remove_pipe('public_pipe_3'); SELECT dbms_pipe.remove_pipe('public_pipe_4'); SELECT dbms_pipe.purge('pipe_name_1'); SELECT dbms_pipe.purge('pipe_name_2'); -- Receives drop table notification from session A via 'pipe_name_3' SELECT dropTempTable(); SELECT dbms_pipe.purge('pipe_name_3'); -- tests unique_session_name() (uses 'pipe_name_4') SELECT checkUniqueSessionNameB(); SELECT dbms_pipe.purge('pipe_name_4'); DROP FUNCTION receiveFrom(text); DROP FUNCTION checkReceive1(text); DROP FUNCTION checkUniqueSessionNameB(); DROP FUNCTION bulkReceive(); DROP FUNCTION dropTempTable(); -- Perform a recieve on removed pipe resulting on timeout SELECT dbms_pipe.receive_message('public_pipe_4',2); SELECT dbms_pipe.purge('public_pipe_4'); SET SESSION AUTHORIZATION DEFAULT; DROP USER pipe_test_owner;