\set ECHO none

-- wait for other processes, wait max 100 sec
do $$
declare c int;
begin
  if pg_try_advisory_xact_lock(1) then
    for i in 1..1000 loop
      perform pg_sleep(0.1);
      c := (select count(*) from pg_locks where locktype = 'advisory' and objid = 1 and not granted);
      if c = 1 then
        return;
      end if;
    end loop;
  else
    perform pg_advisory_xact_lock(1);
  end if;
end;
$$;

SET client_min_messages = warning;
DROP TABLE IF EXISTS TEMP;
CREATE TABLE TEMP(id integer,name text);
INSERT INTO TEMP VALUES (1,'bob'),(2,'rob'),(3,'john');

DROP USER IF EXISTS pipe_test_owner;
CREATE ROLE pipe_test_owner WITH CREATEROLE;
ALTER TABLE TEMP OWNER TO pipe_test_owner;
SET client_min_messages = notice;

-- Notify session B of 'pipe_test_owner' having been created.
SELECT dbms_pipe.pack_message(1);
SELECT dbms_pipe.send_message('pipe_test_owner_created_notifier');
-- Create a new connection under the userid of pipe_test_owner
SET SESSION AUTHORIZATION pipe_test_owner;

/* create an implicit pipe and sends message using
 * send_message(text,integer,integer)
 */
CREATE OR REPLACE FUNCTION send(pipename text) RETURNS void AS $$
BEGIN
        IF dbms_pipe.send_message(pipename,2,10) = 1 THEN
                RAISE NOTICE 'Timeout';
                PERFORM pg_sleep(2);
                PERFORM dbms_pipe.send_message(pipename,2,10);
        END IF;
END; $$ LANGUAGE plpgsql;

-- Test pack_message for all supported types and send_message
CREATE OR REPLACE FUNCTION createImplicitPipe() RETURNS void AS $$
DECLARE
        row TEMP%ROWTYPE;
BEGIN
        PERFORM dbms_pipe.pack_message('Message From Session A'::text);
        PERFORM send('named_pipe');
        PERFORM dbms_pipe.pack_message('2013-01-01'::date);
        PERFORM send('named_pipe');
        PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp);
        PERFORM send('named_pipe');
        PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz);
        PERFORM send('named_pipe');
        PERFORM dbms_pipe.pack_message(12345.6789::numeric);
        PERFORM send('named_pipe');
        PERFORM dbms_pipe.pack_message(12345::integer);
        PERFORM send('named_pipe');
        PERFORM dbms_pipe.pack_message(99999999999::bigint);
        PERFORM send('named_pipe');
        PERFORM dbms_pipe.pack_message(E'\\201'::bytea);
        PERFORM send('named_pipe');
        SELECT * INTO row FROM TEMP WHERE id=2;
	PERFORM dbms_pipe.pack_message(row);
        PERFORM send('named_pipe');
END; $$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION bulkSend() RETURNS void AS $$
DECLARE
        row TEMP%ROWTYPE;
BEGIN
        PERFORM dbms_pipe.pack_message('Message From Session A'::text);
        PERFORM dbms_pipe.pack_message('2013-01-01'::date);
        PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp);
        PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz);
        PERFORM dbms_pipe.pack_message(12345.6789::numeric);
        PERFORM dbms_pipe.pack_message(12345::integer);
        PERFORM dbms_pipe.pack_message(99999999999::bigint);
        PERFORM dbms_pipe.pack_message(E'\\201'::bytea);
        SELECT * INTO row FROM TEMP WHERE id=2;
        PERFORM dbms_pipe.pack_message(row);
        PERFORM send('named_pipe_2');
END; $$ LANGUAGE plpgsql;


/* Creates an explicit pipe using either create_pipe(text,integer,bool),
 * create_pipe(text,integer) OR create_pipe(text).
 * In case third parameter (bool) absent, default is false, that is, it's a public pipe.
 */
CREATE OR REPLACE FUNCTION createPipe(name text,ver integer) RETURNS void AS $$
BEGIN
	IF ver = 3 THEN
                PERFORM dbms_pipe.create_pipe(name,4,true);
        ELSIF ver = 2 THEN
                PERFORM dbms_pipe.create_pipe(name,4);
        ELSE
                PERFORM dbms_pipe.create_pipe(name);
	END IF;
END; $$ LANGUAGE plpgsql;


/* Testing create_pipe for different versions, one of them, is the case of
 * private pipe
 */

CREATE OR REPLACE FUNCTION createExplicitPipe(pipename text,create_version integer) RETURNS void AS $$
DECLARE
        row TEMP%ROWTYPE;
BEGIN

	PERFORM createPipe(pipename,create_version);

	PERFORM dbms_pipe.reset_buffer();

        PERFORM dbms_pipe.pack_message('Message From Session A'::text);
        PERFORM send(pipename);
        PERFORM dbms_pipe.pack_message('2013-01-01'::date);
        PERFORM send(pipename);
        PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp);
        PERFORM send(pipename);
        PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz);
        PERFORM send(pipename);
        PERFORM dbms_pipe.pack_message(12345.6789::numeric);
        PERFORM send(pipename);
        PERFORM dbms_pipe.pack_message(12345::integer);
        PERFORM send(pipename);
        PERFORM dbms_pipe.pack_message(99999999999::bigint);
        PERFORM send(pipename);
        PERFORM dbms_pipe.pack_message(E'\\201'::bytea);
        PERFORM send(pipename);
        SELECT * INTO row FROM TEMP WHERE id=2;
        PERFORM dbms_pipe.pack_message(row);
        PERFORM send(pipename);
END; $$ LANGUAGE plpgsql;


-- Test send_message(text)
CREATE OR REPLACE FUNCTION checkSend1() RETURNS void AS $$
BEGIN
	PERFORM dbms_pipe.pack_message('checking one-argument send_message()');
	PERFORM dbms_pipe.send_message('pipe_name_1');
END; $$ LANGUAGE plpgsql;


-- Test send_message(text,integer)
CREATE OR REPLACE FUNCTION checkSend2() RETURNS void AS $$
BEGIN
        PERFORM dbms_pipe.pack_message('checking two-argument send_message()');
	IF dbms_pipe.send_message('pipe_name_2',2) = 1 THEN
                RAISE NOTICE 'Timeout';
                PERFORM pg_sleep(2);
                PERFORM dbms_pipe.send_message('pipe_name_2',2);
        END IF;
END; $$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION notifyDropTemp() RETURNS void AS $$
BEGIN
	PERFORM dbms_pipe.pack_message(1);
	PERFORM dbms_pipe.send_message('pipe_name_3');
END; $$ LANGUAGE plpgsql;


CREATE OR REPLACE FUNCTION checkUniqueSessionNameA() RETURNS void AS $$
BEGIN
	PERFORM dbms_pipe.pack_message(dbms_pipe.unique_session_name());
	PERFORM dbms_pipe.send_message('pipe_name_4');
END; $$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION notify(pipename text) RETURNS void AS $$
BEGIN
	PERFORM dbms_pipe.pack_message(1);
	PERFORM dbms_pipe.send_message(pipename);
END; $$ LANGUAGE plpgsql;

\set ECHO all

SELECT createImplicitPipe();

-- Bulk send messages
SELECT bulkSend();

-- An explicit private pipe
SELECT notify('recv_private1_notifier');
SELECT createExplicitPipe('private_pipe_1',3);

-- An explicit private pipe
SELECT notify('recv_private2_notifier');
SELECT createExplicitPipe('private_pipe_2',3);

-- An explicit public pipe (uses two-argument create_pipe)
SELECT notify('recv_public1_notifier');
SELECT createExplicitPipe('public_pipe_3',2);

-- An explicit public pipe (uses one-argument create_pipe)
SELECT notify('recv_public2_notifier');
SELECT createExplicitPipe('public_pipe_4',1);

-- tests send_message(text)
SELECT checkSend1();

-- tests send_message(text,integer)
SELECT checkSend2();

SELECT notifyDropTemp();

-- tests unique_session_name()
SELECT checkUniqueSessionNameA();

DROP FUNCTION createImplicitPipe();
DROP FUNCTION createExplicitPipe(text,integer);
DROP FUNCTION createPipe(text,integer);
DROP FUNCTION checkSend1();
DROP FUNCTION checkSend2();
DROP FUNCTION checkUniqueSessionNameA();
DROP FUNCTION bulkSend();
DROP FUNCTION notifyDropTemp();
DROP FUNCTION notify(text);
DROP FUNCTION send(text);