/* * pg_partman_bgw.c * * A background worker process for the pg_partman extension to allow * partition maintenance to be scheduled and run within the database * itself without required a third-party scheduler (ex. cron) * */ #include "postgres.h" /* These are always necessary for a bgworker */ #include "miscadmin.h" #include "postmaster/bgworker.h" #include "storage/ipc.h" #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "storage/shmem.h" /* these headers are used by this particular worker's code */ #include "access/xact.h" #include "executor/spi.h" #include "fmgr.h" #include "lib/stringinfo.h" #include "pgstat.h" #include "utils/builtins.h" #include "utils/snapmgr.h" #include "tcop/utility.h" #include "commands/async.h" #if (PG_VERSION_NUM >= 100000) #include "utils/varlena.h" #endif PG_MODULE_MAGIC; void _PG_init(void); void pg_partman_bgw_main(Datum); void pg_partman_bgw_run_maint(Datum); /* flags set by signal handlers */ static volatile sig_atomic_t got_sighup = false; static volatile sig_atomic_t got_sigterm = false; /* GUC variables */ static int pg_partman_bgw_interval = 3600; // Default hourly static char *pg_partman_bgw_role = "postgres"; // Default to postgres role // Do not analyze by default on PG11+ #if (PG_VERSION_NUM >= 110000) static char *pg_partman_bgw_analyze = "off"; #else static char *pg_partman_bgw_analyze = "on"; #endif static char *pg_partman_bgw_jobmon = "on"; static char *pg_partman_bgw_dbname = NULL; #if (PG_VERSION_NUM < 100500) static bool (*split_function_ptr)(char *, char, List **) = &SplitIdentifierString; #else static bool (*split_function_ptr)(char *, char, List **) = &SplitGUCList; #endif /* * Signal handler for SIGTERM * Set a flag to let the main loop to terminate, and set our latch to wake * it up. */ static void pg_partman_bgw_sigterm(SIGNAL_ARGS) { int save_errno = errno; got_sigterm = true; if (MyProc) SetLatch(&MyProc->procLatch); errno = save_errno; } /* * Signal handler for SIGHUP * Set a flag to tell the main loop to reread the config file, and set * our latch to wake it up. */ static void pg_partman_bgw_sighup(SIGNAL_ARGS) { int save_errno = errno; got_sighup = true; if (MyProc) SetLatch(&MyProc->procLatch); errno = save_errno; } /* * Entrypoint of this module. */ void _PG_init(void) { BackgroundWorker worker; DefineCustomIntVariable("pg_partman_bgw.interval", "How often run_maintenance() is called (in seconds).", NULL, &pg_partman_bgw_interval, 3600, 1, INT_MAX, PGC_SIGHUP, 0, NULL, NULL, NULL); #if (PG_VERSION_NUM >= 110000) DefineCustomStringVariable("pg_partman_bgw.analyze", "Whether to run an analyze on a partition set whenever a new partition is created during run_maintenance(). Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.", NULL, &pg_partman_bgw_analyze, "off", PGC_SIGHUP, 0, NULL, NULL, NULL); #else DefineCustomStringVariable("pg_partman_bgw.analyze", "Whether to run an analyze on a partition set whenever a new partition is created during run_maintenance(). Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.", NULL, &pg_partman_bgw_analyze, "on", PGC_SIGHUP, 0, NULL, NULL, NULL); #endif DefineCustomStringVariable("pg_partman_bgw.dbname", "CSV list of specific databases in the cluster to run pg_partman BGW on.", NULL, &pg_partman_bgw_dbname, NULL, PGC_SIGHUP, 0, NULL, NULL, NULL); DefineCustomStringVariable("pg_partman_bgw.jobmon", "Whether to log run_maintenance() calls to pg_jobmon if it is installed. Set to 'on' to send TRUE (default). Set to 'off' to send FALSE.", NULL, &pg_partman_bgw_jobmon, "on", PGC_SIGHUP, 0, NULL, NULL, NULL); DefineCustomStringVariable("pg_partman_bgw.role", "Role to be used by BGW. Must have execute permissions on run_maintenance()", NULL, &pg_partman_bgw_role, "postgres", PGC_SIGHUP, 0, NULL, NULL, NULL); /* Kept as comment for reference for future development DefineCustomStringVariable("pg_partman_bgw.maintenance_db", "The BGW requires connecting to a local database for reading system catalogs. By default it uses template1. You can change that with this setting if needed.", NULL, &pg_partman_bgw_maint_db, "template1", PGC_SIGHUP, 0, NULL, NULL, NULL); */ if (!process_shared_preload_libraries_in_progress) return; // Start BGW when database starts sprintf(worker.bgw_name, "pg_partman master background worker"); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_RecoveryFinished; worker.bgw_restart_time = 600; #if (PG_VERSION_NUM < 100000) worker.bgw_main = pg_partman_bgw_main; #endif #if (PG_VERSION_NUM >= 100000) sprintf(worker.bgw_library_name, "pg_partman_bgw"); sprintf(worker.bgw_function_name, "pg_partman_bgw_main"); #endif worker.bgw_main_arg = CStringGetDatum(pg_partman_bgw_dbname); worker.bgw_notify_pid = 0; RegisterBackgroundWorker(&worker); } void pg_partman_bgw_main(Datum main_arg) { StringInfoData buf; /* Establish signal handlers before unblocking signals. */ pqsignal(SIGHUP, pg_partman_bgw_sighup); pqsignal(SIGTERM, pg_partman_bgw_sigterm); /* We're now ready to receive signals */ BackgroundWorkerUnblockSignals(); /* Keep for when master requires persistent connection elog(LOG, "%s master process initialized with role %s on database %s" , MyBgworkerEntry->bgw_name , pg_partman_bgw_role , pg_partman_bgw_dbname); */ elog(LOG, "%s master process initialized with role %s" , MyBgworkerEntry->bgw_name , pg_partman_bgw_role); initStringInfo(&buf); /* * Main loop: do this until the SIGTERM handler tells us to terminate */ while (!got_sigterm) { BackgroundWorker worker; BackgroundWorkerHandle *handle; BgwHandleStatus status; char *rawstring; int dbcounter; int rc; int full_string_length; List *elemlist; ListCell *l; pid_t pid; /* Using Latch loop method suggested in latch.h * Uses timeout flag in WaitLatch() further below instead of sleep to allow clean shutdown */ ResetLatch(&MyProc->procLatch); CHECK_FOR_INTERRUPTS(); /* In case of a SIGHUP, just reload the configuration. */ if (got_sighup) { got_sighup = false; ProcessConfigFile(PGC_SIGHUP); } elog(DEBUG1, "After sighup check (got_sighup: %d)", got_sighup); /* In case of a SIGTERM in middle of loop, stop all further processing and return from BGW function to allow it to exit cleanly. */ if (got_sigterm) { elog(LOG, "pg_partman master BGW received SIGTERM. Shutting down. (got_sigterm: %d)", got_sigterm); return; } // Use method of shared_preload_libraries to split the pg_partman_bgw_dbname string found in src/backend/utils/init/miscinit.c // Need a modifiable copy of string if (pg_partman_bgw_dbname != NULL) { rawstring = pstrdup(pg_partman_bgw_dbname); // Parse string into list of identifiers if (!(*split_function_ptr)(rawstring, ',', &elemlist)) { // syntax error in list pfree(rawstring); list_free(elemlist); ereport(LOG, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid list syntax in parameter \"pg_partman_bgw.dbname\" in postgresql.conf"))); return; } dbcounter = 0; foreach(l, elemlist) { char *dbname = (char *) lfirst(l); elog(DEBUG1, "Dynamic bgw launch begun for %s (%d)", dbname, dbcounter); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_RecoveryFinished; worker.bgw_restart_time = BGW_NEVER_RESTART; #if (PG_VERSION_NUM < 100000) worker.bgw_main = NULL; #endif sprintf(worker.bgw_library_name, "pg_partman_bgw"); sprintf(worker.bgw_function_name, "pg_partman_bgw_run_maint"); full_string_length = snprintf(worker.bgw_name, sizeof(worker.bgw_name), "pg_partman dynamic background worker (dbname=%s)", dbname); if (full_string_length >= sizeof(worker.bgw_name)) { /* dbname was truncated, add an ellipsis to denote it */ const char truncated_mark[] = "...)"; memcpy(worker.bgw_name + sizeof(worker.bgw_name) - sizeof(truncated_mark), truncated_mark, sizeof(truncated_mark)); } worker.bgw_main_arg = Int32GetDatum(dbcounter); worker.bgw_notify_pid = MyProcPid; dbcounter++; elog(DEBUG1, "Registering dynamic background worker..."); if (!RegisterDynamicBackgroundWorker(&worker, &handle)) { elog(ERROR, "Unable to register dynamic background worker for pg_partman. Consider increasing max_worker_processes if you see this frequently. Main background worker process will try restarting in 10 minutes."); } elog(DEBUG1, "Waiting for BGW startup..."); status = WaitForBackgroundWorkerStartup(handle, &pid); elog(DEBUG1, "BGW startup status: %d", status); if (status == BGWH_STOPPED) { ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("Could not start dynamic pg_partman background process"), errhint("More details may be available in the server log."))); } if (status == BGWH_POSTMASTER_DIED) { ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("Cannot start dynamic pg_partman background processes without postmaster"), errhint("Kill all remaining database processes and restart the database."))); } Assert(status == BGWH_STARTED); #if (PG_VERSION_NUM >= 90500) // Shutdown wait function introduced in 9.5. The latch problems this wait fixes are only encountered in // 9.6 and later. So this shouldn't be a problem for 9.4. elog(DEBUG1, "Waiting for BGW shutdown..."); status = WaitForBackgroundWorkerShutdown(handle); elog(DEBUG1, "BGW shutdown status: %d", status); Assert(status == BGWH_STOPPED); #endif } pfree(rawstring); list_free(elemlist); } else { // pg_partman_bgw_dbname if null elog(DEBUG1, "pg_partman_bgw.dbname GUC is NULL. Nothing to do in main loop."); } elog(DEBUG1, "Latch status just before waitlatch call: %d", MyProc->procLatch.is_set); #if (PG_VERSION_NUM >= 100000) rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, pg_partman_bgw_interval * 1000L, PG_WAIT_EXTENSION); #endif #if (PG_VERSION_NUM < 100000) rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, pg_partman_bgw_interval * 1000L); #endif /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) { proc_exit(1); } elog(DEBUG1, "Latch status after waitlatch call: %d", MyProc->procLatch.is_set); } // end sigterm while } // end main /* * Unable to pass the database name as a string argument (not sure why yet) * Instead, the GUC is parsed both in the main function and below and a counter integer * is passed to determine which database the BGW will run in. */ void pg_partman_bgw_run_maint(Datum arg) { char *analyze; char *dbname = "template1"; char *jobmon; char *partman_schema; char *rawstring; int db_main_counter = DatumGetInt32(arg); List *elemlist; int ret; StringInfoData buf; /* Establish signal handlers before unblocking signals. */ pqsignal(SIGHUP, pg_partman_bgw_sighup); pqsignal(SIGTERM, pg_partman_bgw_sigterm); /* We're now ready to receive signals */ BackgroundWorkerUnblockSignals(); elog(DEBUG1, "Before parsing dbname GUC in dynamic main func: %s", pg_partman_bgw_dbname); rawstring = pstrdup(pg_partman_bgw_dbname); elog(DEBUG1, "GUC rawstring copy: %s", rawstring); // Parse string into list of identifiers if (!(*split_function_ptr)(rawstring, ',', &elemlist)) { // syntax error in list pfree(rawstring); list_free(elemlist); ereport(LOG, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid list syntax in parameter \"pg_partman_bgw.dbname\" in postgresql.conf"))); return; } dbname = list_nth(elemlist, db_main_counter); elog(DEBUG1, "Parsing dbname list: %s (%d)", dbname, db_main_counter); if (strcmp(dbname, "template1") == 0) { elog(DEBUG1, "Default database name found in dbname local variable (\"template1\")."); } elog(DEBUG1, "Before run_maint initialize connection for db %s", dbname); #if (PG_VERSION_NUM < 110000) BackgroundWorkerInitializeConnection(dbname, pg_partman_bgw_role); #endif #if (PG_VERSION_NUM >= 110000) BackgroundWorkerInitializeConnection(dbname, pg_partman_bgw_role, 0); #endif elog(DEBUG1, "After run_maint initialize connection for db %s", dbname); initStringInfo(&buf); SetCurrentStatementStartTimestamp(); StartTransactionCommand(); SPI_connect(); PushActiveSnapshot(GetTransactionSnapshot()); pgstat_report_appname("pg_partman dynamic background worker"); // First determine if pg_partman is even installed in this database appendStringInfo(&buf, "SELECT extname FROM pg_catalog.pg_extension WHERE extname = 'pg_partman'"); pgstat_report_activity(STATE_RUNNING, buf.data); elog(DEBUG1, "Checking if pg_partman extension is installed in database: %s" , dbname); ret = SPI_execute(buf.data, true, 1); if (ret != SPI_OK_SELECT) { elog(FATAL, "Cannot determine if pg_partman is installed in database %s: error code %d", dbname, ret); } if (SPI_processed <= 0) { elog(DEBUG1, "pg_partman not installed in database %s. Nothing to do so dynamic worker exiting gracefully.", dbname); // Nothing left to do. Return end the run of BGW function. SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); pgstat_report_activity(STATE_IDLE, NULL); pfree(rawstring); list_free(elemlist); return; } // If so then actually log that it's started for that database. elog(LOG, "%s dynamic background worker initialized with role %s on database %s" , MyBgworkerEntry->bgw_name , pg_partman_bgw_role , dbname); resetStringInfo(&buf); appendStringInfo(&buf, "SELECT n.nspname FROM pg_catalog.pg_extension e JOIN pg_catalog.pg_namespace n ON e.extnamespace = n.oid WHERE extname = 'pg_partman'"); pgstat_report_activity(STATE_RUNNING, buf.data); ret = SPI_execute(buf.data, true, 1); if (ret != SPI_OK_SELECT) { elog(FATAL, "Cannot determine which schema pg_partman has been installed to: error code %d", ret); } if (SPI_processed > 0) { bool isnull; partman_schema = DatumGetCString(SPI_getbinval(SPI_tuptable->vals[0] , SPI_tuptable->tupdesc , 1 , &isnull)); if (isnull) elog(FATAL, "Query to determine pg_partman schema returned NULL."); } else { elog(FATAL, "Query to determine pg_partman schema returned zero rows."); } resetStringInfo(&buf); if (strcmp(pg_partman_bgw_analyze, "on") == 0) { analyze = "true"; } else { analyze = "false"; } if (strcmp(pg_partman_bgw_jobmon, "on") == 0) { jobmon = "true"; } else { jobmon = "false"; } appendStringInfo(&buf, "SELECT \"%s\".run_maintenance(p_analyze := %s, p_jobmon := %s)", partman_schema, analyze, jobmon); pgstat_report_activity(STATE_RUNNING, buf.data); ret = SPI_execute(buf.data, false, 0); if (ret != SPI_OK_SELECT) elog(FATAL, "Cannot call pg_partman run_maintenance() function: error code %d", ret); elog(LOG, "%s: %s called by role %s on database %s" , MyBgworkerEntry->bgw_name , buf.data , pg_partman_bgw_role , dbname); SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); #if (PG_VERSION_NUM < 150000) ProcessCompletedNotifies(); #endif pgstat_report_activity(STATE_IDLE, NULL); elog(DEBUG1, "pg_partman dynamic BGW shutting down gracefully for database %s.", dbname); pfree(rawstring); list_free(elemlist); return; }