#include "postgres.h" #include "fmgr.h" #include "utils/builtins.h" #include "utils/timestamp.h" #include "executor/spi.h" #include "access/htup_details.h" #include "catalog/pg_type.h" #include "miscadmin.h" #include "storage/proc.h" #include "postmaster/bgworker.h" #include "storage/ipc.h" #include "storage/latch.h" #include "utils/guc.h" #include "tcop/utility.h" #include "utils/snapmgr.h" #include "access/xact.h" #include "utils/wait_event.h" #include "utils/elog.h" #include "nodes/pg_list.h" #include "utils/memutils.h" PG_MODULE_MAGIC; static int ttl_naptime = 60; static bool ttl_worker_enabled = true; static volatile sig_atomic_t got_SIGTERM = false; static void ttl_sigterm_handler(SIGNAL_ARGS); static void ttl_sighup_handler(SIGNAL_ARGS); void _PG_init(void); void ttl_worker_main(Datum main_arg); PG_FUNCTION_INFO_V1(ttl_create_index); PG_FUNCTION_INFO_V1(ttl_drop_index); PG_FUNCTION_INFO_V1(ttl_runner); PG_FUNCTION_INFO_V1(ttl_start_worker); static void register_background_worker(void); static bool validate_date_column(const char *table_name, const char *column_name); static void ttl_sigterm_handler(SIGNAL_ARGS) { int save_errno = errno; got_SIGTERM = true; SetLatch(MyLatch); errno = save_errno; } static void ttl_sighup_handler(SIGNAL_ARGS) { int save_errno = errno; SetLatch(MyLatch); errno = save_errno; } void _PG_init(void) { DefineCustomIntVariable("pg_ttl_index.naptime", "Duration between TTL cleanup runs (seconds)", NULL, &ttl_naptime, 60, 1, INT_MAX, PGC_SIGHUP, 0, NULL, NULL, NULL); DefineCustomBoolVariable("pg_ttl_index.enabled", "Enable TTL background worker", NULL, &ttl_worker_enabled, true, PGC_SIGHUP, 0, NULL, NULL, NULL); if (process_shared_preload_libraries_in_progress) register_background_worker(); } static void register_background_worker(void) { BackgroundWorker worker; memset(&worker, 0, sizeof(worker)); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_RecoveryFinished; strcpy(worker.bgw_library_name, "pg_ttl_index"); strcpy(worker.bgw_function_name, "ttl_worker_main"); strcpy(worker.bgw_name, "TTL Index Worker"); strcpy(worker.bgw_type, "TTL Index Worker"); worker.bgw_restart_time = 60; worker.bgw_notify_pid = 0; worker.bgw_main_arg = (Datum) 0; RegisterBackgroundWorker(&worker); } void ttl_worker_main(Datum main_arg) { Oid database_id = DatumGetObjectId(main_arg); pqsignal(SIGTERM, ttl_sigterm_handler); pqsignal(SIGHUP, ttl_sighup_handler); BackgroundWorkerUnblockSignals(); BackgroundWorkerInitializeConnectionByOid(database_id, InvalidOid, 0); elog(LOG, "TTL background worker started for database OID %u, cleanup interval: %d seconds", database_id, ttl_naptime); while (!got_SIGTERM) { int rc; rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, ttl_naptime * 1000L, PG_WAIT_EXTENSION); ResetLatch(MyLatch); if (rc & WL_POSTMASTER_DEATH) proc_exit(1); if (ttl_worker_enabled) { PG_TRY(); { int ret; StartTransactionCommand(); SPI_connect(); PushActiveSnapshot(GetTransactionSnapshot()); ret = SPI_exec("SELECT 1 FROM pg_extension WHERE extname = 'pg_ttl_index'", 0); if (ret == SPI_OK_SELECT && SPI_processed > 0) { ret = SPI_exec("SELECT ttl_runner_safe()", 0); if (ret == SPI_OK_SELECT) { elog(LOG, "TTL background worker: cleanup completed successfully"); } } else { elog(DEBUG2, "TTL background worker: extension not installed in current database"); } SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); } PG_CATCH(); { ErrorData *edata; edata = CopyErrorData(); FlushErrorState(); elog(LOG, "TTL background worker error: %s", edata->message); FreeErrorData(edata); AbortCurrentTransaction(); } PG_END_TRY(); } } proc_exit(1); } static bool validate_date_column(const char *table_name, const char *column_name) { StringInfoData query; int ret; bool is_date = false; initStringInfo(&query); appendStringInfo(&query, "SELECT 1 FROM information_schema.columns " "WHERE table_name = '%s' AND column_name = '%s' " "AND data_type IN ('timestamp without time zone', 'timestamp with time zone', 'date')", table_name, column_name); ret = SPI_exec(query.data, 1); if (ret == SPI_OK_SELECT && SPI_processed > 0) is_date = true; pfree(query.data); return is_date; } Datum ttl_create_index(PG_FUNCTION_ARGS) { text *table_name_text = PG_GETARG_TEXT_PP(0); text *column_name_text = PG_GETARG_TEXT_PP(1); int32 expire_after_seconds = PG_GETARG_INT32(2); char *table_name = text_to_cstring(table_name_text); char *column_name = text_to_cstring(column_name_text); StringInfoData query; int ret; bool success = false; if (SPI_connect() != SPI_OK_CONNECT) ereport(ERROR, (errmsg("SPI_connect failed"))); if (!validate_date_column(table_name, column_name)) { SPI_finish(); ereport(ERROR, (errmsg("Column %s in table %s must be a date/timestamp type", column_name, table_name))); } initStringInfo(&query); appendStringInfo(&query, "INSERT INTO ttl_index_table (table_name, column_name, expire_after_seconds, active, created_at) " "VALUES ('%s', '%s', %d, true, NOW()) " "ON CONFLICT (table_name, column_name) DO UPDATE SET " "expire_after_seconds = EXCLUDED.expire_after_seconds, " "active = true, " "updated_at = NOW()", table_name, column_name, expire_after_seconds); ret = SPI_exec(query.data, 0); if (ret == SPI_OK_INSERT || ret == SPI_OK_UPDATE) success = true; pfree(query.data); SPI_finish(); PG_RETURN_BOOL(success); } Datum ttl_drop_index(PG_FUNCTION_ARGS) { text *table_name_text = PG_GETARG_TEXT_PP(0); text *column_name_text = PG_GETARG_TEXT_PP(1); char *table_name = text_to_cstring(table_name_text); char *column_name = text_to_cstring(column_name_text); StringInfoData query; int ret; bool success = false; if (SPI_connect() != SPI_OK_CONNECT) ereport(ERROR, (errmsg("SPI_connect failed"))); initStringInfo(&query); appendStringInfo(&query, "DELETE FROM ttl_index_table WHERE table_name = '%s' AND column_name = '%s'", table_name, column_name); ret = SPI_exec(query.data, 0); if (ret == SPI_OK_DELETE && SPI_processed > 0) success = true; pfree(query.data); SPI_finish(); PG_RETURN_BOOL(success); } Datum ttl_runner(PG_FUNCTION_ARGS) { elog(NOTICE, "ttl_runner() is deprecated. Use ttl_runner_safe() instead."); PG_RETURN_INT32(0); } Datum ttl_start_worker(PG_FUNCTION_ARGS) { BackgroundWorker worker; BackgroundWorkerHandle *handle; pid_t pid; memset(&worker, 0, sizeof(worker)); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_ConsistentState; strcpy(worker.bgw_library_name, "pg_ttl_index"); strcpy(worker.bgw_function_name, "ttl_worker_main"); snprintf(worker.bgw_name, BGW_MAXLEN, "TTL Worker DB %u", MyDatabaseId); strcpy(worker.bgw_type, "TTL Index Worker"); worker.bgw_restart_time = 60; worker.bgw_notify_pid = MyProcPid; worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); if (!RegisterDynamicBackgroundWorker(&worker, &handle)) { elog(ERROR, "Failed to register TTL background worker"); PG_RETURN_BOOL(false); } switch (WaitForBackgroundWorkerStartup(handle, &pid)) { case BGWH_STARTED: elog(LOG, "TTL background worker started successfully for database %u with PID %d", MyDatabaseId, (int) pid); PG_RETURN_BOOL(true); case BGWH_STOPPED: elog(WARNING, "TTL background worker failed to start"); PG_RETURN_BOOL(false); case BGWH_POSTMASTER_DIED: elog(ERROR, "Cannot start TTL background worker because postmaster died"); PG_RETURN_BOOL(false); default: elog(ERROR, "Unknown background worker startup result"); PG_RETURN_BOOL(false); } }