/*------------------------------------------------------------------------- * * connection_management.h * Central management of connections and their life-cycle * * Copyright (c) Citus Data, Inc. * *------------------------------------------------------------------------- */ #ifndef CONNECTION_MANAGMENT_H #define CONNECTION_MANAGMENT_H #include "postgres.h" #include "distributed/transaction_management.h" #include "distributed/remote_transaction.h" #include "lib/ilist.h" #include "pg_config.h" #include "portability/instr_time.h" #include "storage/latch.h" #include "utils/guc.h" #include "utils/hsearch.h" #include "utils/timestamp.h" /* maximum (textual) lengths of hostname and port */ #define MAX_NODE_LENGTH 255 /* includes 0 byte */ /* used for libpq commands that get an error buffer. Postgres docs recommend 256. */ #define ERROR_BUFFER_SIZE 256 /* values with special behavior for authinfo lookup */ #define WILDCARD_NODE_ID 0 #define LOCALHOST_NODE_ID -1 /* application name used for internal connections in Citus */ #define CITUS_APPLICATION_NAME_PREFIX "citus_internal gpid=" /* application name used for internal connections in rebalancer */ #define CITUS_REBALANCER_APPLICATION_NAME_PREFIX "citus_rebalancer gpid=" /* application name used for connections made by run_command_on_* */ #define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid=" /* deal with waiteventset errors */ #define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1 #define WAIT_EVENT_SET_INDEX_FAILED -2 /* forward declare, to avoid forcing large headers on everyone */ struct pg_conn; /* target of the PGconn typedef */ struct MemoryContextData; /* * Flags determining connection establishment behaviour. */ enum MultiConnectionMode { /* force establishment of a new connection */ FORCE_NEW_CONNECTION = 1 << 0, FOR_DDL = 1 << 1, FOR_DML = 1 << 2, /* * During COPY we do not want to use a connection that accessed non-co-located * placements. If there is a connection that did not access another placement, * then use it. Otherwise open a new clean connection. */ REQUIRE_CLEAN_CONNECTION = 1 << 3, OUTSIDE_TRANSACTION = 1 << 4, /* * All metadata changes should go through the same connection, otherwise * self-deadlocks are possible. That is because the same metadata (e.g., * metadata includes the distributed table on the workers) can be modified * accross multiple connections. * * With this flag, we guarantee that there is a single metadata connection. * But note that this connection can be used for any other operation. * In other words, this connection is not exclusively reserved for metadata * operations. */ REQUIRE_METADATA_CONNECTION = 1 << 5, /* * Some connections are optional such as when adaptive executor is executing * a multi-shard command and requires the second (or further) connections * per node. In that case, the connection manager may decide not to allow the * connection. */ OPTIONAL_CONNECTION = 1 << 6, /* * When this flag is passed, via connection throttling, the connection * establishments may be suspended until a connection slot is available to * the remote host. */ WAIT_FOR_CONNECTION = 1 << 7, /* * Use the flag to start a connection for streaming replication. * This flag constructs additional libpq connection parameters needed for streaming * replication protocol. It adds 'replication=database' param which instructs * the backend to go into logical replication walsender mode. * https://www.postgresql.org/docs/current/protocol-replication.html * * This is need to run 'CREATE_REPLICATION_SLOT' command. */ REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8 }; /* * This state is used for keeping track of the initilization * of the underlying pg_conn struct. */ typedef enum MultiConnectionState { MULTI_CONNECTION_INITIAL, MULTI_CONNECTION_CONNECTING, MULTI_CONNECTION_CONNECTED, MULTI_CONNECTION_FAILED, MULTI_CONNECTION_LOST, MULTI_CONNECTION_TIMED_OUT } MultiConnectionState; /* * This state is used for keeping track of the initilization * of MultiConnection struct, not specifically the underlying * pg_conn. The state is useful to determine the action during * clean-up of connections. */ typedef enum MultiConnectionStructInitializationState { POOL_STATE_NOT_INITIALIZED, POOL_STATE_COUNTER_INCREMENTED, POOL_STATE_INITIALIZED } MultiConnectionStructInitializationState; /* declaring this directly above causes uncrustify to format it badly */ typedef enum MultiConnectionMode MultiConnectionMode; typedef struct MultiConnection { /* connection details, useful for error messages and such. */ char hostname[MAX_NODE_LENGTH]; int32 port; char user[NAMEDATALEN]; char database[NAMEDATALEN]; /* underlying libpq connection */ struct pg_conn *pgConn; /* connection id */ uint64 connectionId; /* state of the connection */ MultiConnectionState connectionState; /* signal that the connection is ready for read/write */ bool ioReady; /* whether to wait for read/write */ int waitFlags; /* force the connection to be closed at the end of the transaction */ bool forceCloseAtTransactionEnd; /* is the connection currently in use, and shouldn't be used by anything else */ bool claimedExclusively; /* * Should be used to access/modify metadata. See REQUIRE_METADATA_CONNECTION for * the details. */ bool useForMetadataOperations; /* time connection establishment was started, for timeout and executor stats */ instr_time connectionEstablishmentStart; instr_time connectionEstablishmentEnd; /* membership in list of list of connections in ConnectionHashEntry */ dlist_node connectionNode; /* information about the associated remote transaction */ RemoteTransaction remoteTransaction; /* * membership in list of in-progress transactions and a flag to indicate * that the connection was added to this list */ dlist_node transactionNode; bool transactionInProgress; /* list of all placements referenced by this connection */ dlist_head referencedPlacements; /* number of bytes sent to PQputCopyData() since last flush */ uint64 copyBytesWrittenSinceLastFlush; /* replication option */ bool requiresReplication; MultiConnectionStructInitializationState initilizationState; } MultiConnection; /* * Central connection management hash, mapping (host, port, user, database) to * a list of connections. * * This hash is used to keep track of which connections are open to which * node. Besides allowing connection reuse, that information is e.g. used to * handle closing connections after the end of a transaction. */ /* hash key */ typedef struct ConnectionHashKey { char hostname[MAX_NODE_LENGTH]; int32 port; char user[NAMEDATALEN]; char database[NAMEDATALEN]; bool replicationConnParam; } ConnectionHashKey; /* hash entry */ typedef struct ConnectionHashEntry { ConnectionHashKey key; dlist_head *connections; /* connections list is valid or not */ bool isValid; } ConnectionHashEntry; /* hash entry for cached connection parameters */ typedef struct ConnParamsHashEntry { ConnectionHashKey key; bool isValid; Index runtimeParamStart; char **keywords; char **values; } ConnParamsHashEntry; /* maximum duration to wait for connection */ extern int NodeConnectionTimeout; /* maximum number of connections to cache per worker per session */ extern int MaxCachedConnectionsPerWorker; /* maximum lifetime of connections in miliseconds */ extern int MaxCachedConnectionLifetime; /* parameters used for outbound connections */ extern char *NodeConninfo; extern char *LocalHostName; /* the hash tables are externally accessiable */ extern HTAB *ConnectionHash; extern HTAB *ConnParamsHash; /* context for all connection and transaction related memory */ extern struct MemoryContextData *ConnectionContext; extern void AfterXactConnectionHandling(bool isCommit); extern void InitializeConnectionManagement(void); extern char * GetAuthinfo(char *hostname, int32 port, char *user); extern void InitConnParams(void); extern void ResetConnParams(void); extern void InvalidateConnParamsHashEntries(void); extern void AddConnParam(const char *keyword, const char *value); extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, Index *runtimeParamStart, MemoryContext context); extern const char * GetConnParam(const char *keyword); extern bool CheckConninfo(const char *conninfo, const char **allowedConninfoKeywords, Size allowedConninfoKeywordsLength, char **errmsg); /* Low-level connection establishment APIs */ extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, int32 port); extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, int32 port); extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const char *user, const char *database); extern MultiConnection * GetConnectionForLocalQueriesOutsideTransaction(char *userName); extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const char *user, const char *database); extern void RestartConnection(MultiConnection *connection); extern void CloseAllConnectionsAfterTransaction(void); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName, const char *database); extern void CloseConnection(MultiConnection *connection); extern void ShutdownAllConnections(void); extern void ShutdownConnection(MultiConnection *connection); /* dealing with a connection */ extern void FinishConnectionListEstablishment(List *multiConnectionList); extern void FinishConnectionEstablishment(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection); extern void MarkConnectionConnected(MultiConnection *connection); /* waiteventset utilities */ extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data); extern bool CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch); /* time utilities */ extern double MillisecondsPassedSince(instr_time moment); extern long MillisecondsToTimeout(instr_time start, long msAfterStart); #if PG_VERSION_NUM < 140000 extern void WarmUpConnParamsHash(void); #endif #endif /* CONNECTION_MANAGMENT_H */