-- citus--8.0-5--8.0-6 SET search_path = 'pg_catalog'; CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz) RETURNS SETOF RECORD LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$get_global_active_transactions$$; COMMENT ON FUNCTION get_global_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz) IS 'returns distributed transaction ids of active distributed transactions from each node of the cluster'; CREATE OR REPLACE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) RETURNS int4[] AS $$ DECLARE mLocalBlockingPids int4[]; mRemoteBlockingPids int4[]; mLocalTransactionNum int8; workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); BEGIN SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids; IF (array_length(mLocalBlockingPids, 1) > 0) THEN RETURN mLocalBlockingPids; END IF; -- pg says we're not blocked locally; check whether we're blocked globally. -- Note that worker process may be blocked or waiting for a lock. So we need to -- get transaction number for both of them. Following IF provides the transaction -- number when the worker process waiting for other session. IF EXISTS (SELECT transaction_number FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN SELECT transaction_number INTO mLocalTransactionNum FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; ELSE -- Check whether transactions initiated from the coordinator get locked SELECT transaction_number INTO mLocalTransactionNum FROM get_all_active_transactions() WHERE process_id = pBlockedPid; END IF; IF EXISTS (SELECT waiting_transaction_num FROM dump_global_wait_edges() WHERE waiting_transaction_num = mLocalTransactionNum) THEN SELECT array_agg(pBlockedPid) INTO mRemoteBlockingPids; END IF; RETURN mRemoteBlockingPids; END; $$ LANGUAGE plpgsql; #include "udfs/citus_isolation_test_session_is_blocked/8.0-6.sql" RESET search_path;