#ifndef _INCLUDE_H_ #define _INCLUDE_H_ #define countof(array) (sizeof(array)/sizeof(array[0])) #define SQL(...) #__VA_ARGS__ #include #include #include #include #if PG_VERSION_NUM >= 120000 #include #endif #include #include #include #include #include #include #include #include #include #include #include #if PG_VERSION_NUM >= 110000 #include #endif #if PG_VERSION_NUM < 90600 #include "latch.h" #endif #include #include #include #include #if PG_VERSION_NUM >= 160000 #include #endif #include #include #include #include #if PG_VERSION_NUM >= 130000 #include #else extern PGDLLIMPORT volatile sig_atomic_t ShutdownRequestPending; extern void SignalHandlerForConfigReload(SIGNAL_ARGS); extern void SignalHandlerForShutdownRequest(SIGNAL_ARGS); #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if PG_VERSION_NUM >= 100000 #include #endif #include #include #include #include #include "dest.h" #ifdef GP_VERSION_NUM #include "cdb/cdbvars.h" #endif #if PG_VERSION_NUM >= 90500 #define dsm_create_my(size) dsm_create(size, 0) #define set_config_option_my(name, value, context, source, action, changeVal, elevel) set_config_option(name, value, context, source, action, changeVal, elevel, false) #else #define dsm_create_my(size) dsm_create(size) #define MyLatch (&MyProc->procLatch) #define set_config_option_my(name, value, context, source, action, changeVal, elevel) set_config_option(name, value, context, source, action, changeVal, elevel) #endif #if PG_VERSION_NUM >= 100000 #define createdb_my(pstate, stmt) createdb(pstate, stmt) #define CreateRoleMy(pstate, stmt) CreateRole(pstate, stmt) #define makeDefElemMy(name, arg) makeDefElem(name, arg, -1) #define shm_toc_lookup_my(toc, key) shm_toc_lookup(toc, key, false) #define WaitEventSetWaitMy(set, timeout, occurred_events, nevents) WaitEventSetWait(set, timeout, occurred_events, nevents, PG_WAIT_EXTENSION) #define WaitLatchMy(latch, wakeEvents, timeout) WaitLatch(latch, wakeEvents, timeout, PG_WAIT_EXTENSION) #else #define createdb_my(pstate, stmt) createdb(stmt) #define CreateRoleMy(pstate, stmt) CreateRole(stmt) #define makeDefElemMy(name, arg) makeDefElem(name, arg) #ifdef GP_VERSION_NUM #define shm_toc_lookup_my(toc, key) shm_toc_lookup(toc, key, false) #else #define shm_toc_lookup_my(toc, key) shm_toc_lookup(toc, key) #endif #define WL_SOCKET_MASK (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE) #define WaitEventSetWaitMy(set, timeout, occurred_events, nevents) WaitEventSetWait(set, timeout, occurred_events, nevents) #define WaitLatchMy(latch, wakeEvents, timeout) WaitLatch(latch, wakeEvents, timeout) #endif #if PG_VERSION_NUM >= 110000 #define BackgroundWorkerInitializeConnectionMy(dbname, username) BackgroundWorkerInitializeConnection(dbname, username, 0) #else #define BackgroundWorkerInitializeConnectionMy(dbname, username) BackgroundWorkerInitializeConnection(dbname, username) #endif #if PG_VERSION_NUM >= 120000 #define relation_openrv_extended_my(relation, lockmode, missing_ok) relation_openrv_extended(relation, lockmode, false) #else #ifdef GP_VERSION_NUM #define relation_openrv_extended_my(relation, lockmode, missing_ok) relation_openrv_extended(relation, lockmode, missing_ok, false) #else #define relation_openrv_extended_my(relation, lockmode, missing_ok) relation_openrv_extended(relation, lockmode, missing_ok) #endif #endif #if PG_VERSION_NUM >= 130000 #define set_ps_display_my(activity) set_ps_display(activity) #else #define set_ps_display_my(activity) set_ps_display(activity, false) #endif #if PG_VERSION_NUM >= 160000 #define parseTypeStringMy(str, typeid_p, typmod_p) parseTypeString(str, typeid_p, typmod_p, (Node *)&(ErrorSaveContext){T_ErrorSaveContext}) #define stringToQualifiedNameListMy(string) stringToQualifiedNameList(string, NULL) #else #define parseTypeStringMy(str, typeid_p, typmod_p) parseTypeString(str, typeid_p, typmod_p, true) #define stringToQualifiedNameListMy(string) stringToQualifiedNameList(string) #endif #define PG_WORK_MAGIC 0x776f726b #ifndef get_timeout_active #define get_timeout_active get_timeout_finish_time #endif typedef struct WorkShared { char data[NAMEDATALEN]; char schema[NAMEDATALEN]; char table[NAMEDATALEN]; char user[NAMEDATALEN]; int64 reset; int64 sleep; int run; Oid oid; } WorkShared; typedef struct Work { char *schema_table; char *schema_type; const char *columns; const char *data; const char *schema; const char *table; const char *user; dlist_node node; dsm_segment *seg; int hash; pid_t pid; TimestampTz start; WorkShared *shared; } Work; #define PG_TASK_MAGIC 0x7461736b typedef struct TaskShared { dsm_handle handle; int64 id; int hash; int max; } TaskShared; typedef struct Task { bool active; bool header; bool lock; bool string; char delimiter; char escape; char *group; char *input; char *null; char quote; char *remote; dlist_node node; dsm_segment *seg; int count; int event; int pid; int skip; int timeout; PGconn *conn; StringInfoData error; StringInfoData output; TaskShared *shared; TimestampTz start; uint64 row; void (*socket) (struct Task *t); } Task; bool init_oid_is_string(Oid oid); bool is_log_level_output(int elevel, int log_min_level); bool lock_data_user_hash(Oid data, Oid user, int hash); bool lock_data_user(Oid data, Oid user); bool lock_table_id(Oid table, int64 id); bool lock_table_pid_hash(Oid table, int pid, int hash); bool task_done(Task *t); bool task_work(Task *t); bool unlock_data_user_hash(Oid data, Oid user, int hash); bool unlock_data_user(Oid data, Oid user); bool unlock_table_id(Oid table, int64 id); bool unlock_table_pid_hash(Oid table, int pid, int hash); char *TextDatumGetCStringMy(Datum datum); const char *error_severity(int elevel); Datum CStringGetTextDatumMy(const char *s); Datum SPI_getbinval_my(HeapTupleData *tuple, TupleDesc tupdesc, const char *fname, bool allow_null, Oid typeid); #if PG_VERSION_NUM < 120000 extern PGDLLIMPORT ResourceOwner AuxProcessResourceOwner; #endif extern PGDLLIMPORT ResourceOwner SPIResourceOwner; int severity_error(const char *error); PGDLLEXPORT void conf_main(Datum main_arg); PGDLLEXPORT void task_main(Datum main_arg); PGDLLEXPORT void work_main(Datum main_arg); Portal SPI_cursor_open_my(const char *src, SPIPlanPtr plan, Datum *values, const char *nulls); Portal SPI_cursor_open_with_args_my(const char *src, int nargs, Oid *argtypes, Datum *values, const char *nulls); SPIPlanPtr SPI_prepare_my(const char *src, int nargs, Oid *argtypes); void appendBinaryStringInfoEscapeQuote(StringInfoData *buf, const char *data, int len, bool string, char escape, char quote); void append_with_tabs(StringInfo buf, const char *str); #if PG_VERSION_NUM < 120000 void CreateAuxProcessResourceOwner(void); #endif void initStringInfoMy(StringInfoData *buf); void init_conf(bool dynamic); void _PG_init(void); #if PG_VERSION_NUM < 120000 void ReleaseAuxProcessResources(bool isCommit); #endif void *shm_toc_allocate_my(uint64 magic, dsm_segment **seg, Size nbytes); void SPI_connect_my(const char *src); void SPI_cursor_close_my(Portal portal); void SPI_cursor_fetch_my(const char *src, Portal portal, bool forward, long count); void SPI_execute_plan_my(const char *src, SPIPlanPtr plan, Datum *values, const char *nulls, int res); void SPI_execute_with_args_my(const char *src, int nargs, Oid *argtypes, Datum *values, const char *nulls, int res); void SPI_finish_my(void); void task_error(ErrorData *edata); void task_free(Task *t); #endif // _INCLUDE_H_