/*---------------- * pg_stat_kcache * * Provides basic statistics about real I/O done by the filesystem layer. * This way, calculating a real hit-ratio is doable. Also provides basis * statistics about CPU usage. * * Large portions of code freely inspired by pg_stat_plans. Thanks to Peter * Geoghegan for this great extension. * * This program is open source, licensed under the PostgreSQL license. * For license terms, see the LICENSE file. * */ #include "postgres.h" #include #ifdef HAVE_SYS_RESOURCE_H #include #include #endif #ifndef HAVE_GETRUSAGE #include "rusagestub.h" #endif #include "access/hash.h" #if PG_VERSION_NUM >= 90600 #include "access/parallel.h" #endif #include "executor/executor.h" #include "funcapi.h" #include "miscadmin.h" #include "pgstat.h" #if PG_VERSION_NUM >= 90600 #include "postmaster/autovacuum.h" #endif #include "storage/fd.h" #include "storage/ipc.h" #include "storage/spin.h" #include "utils/builtins.h" #include "utils/guc.h" PG_MODULE_MAGIC; #if PG_VERSION_NUM >= 90300 #define PGSK_DUMP_FILE "pg_stat/pg_stat_kcache.stat" #else #define PGSK_DUMP_FILE "global/pg_stat_kcache.stat" #endif /* In PostgreSQL 11, queryid becomes a uint64 internally. */ #if PG_VERSION_NUM >= 110000 typedef uint64 pgsk_queryid; #else typedef uint32 pgsk_queryid; #endif #define PG_STAT_KCACHE_COLS_V2_0 7 #define PG_STAT_KCACHE_COLS_V2_1 15 #define PG_STAT_KCACHE_COLS 15 /* maximum of above */ #define USAGE_INCREASE (1.0) #define USAGE_DECREASE_FACTOR (0.99) /* decreased every pgsk_entry_dealloc */ #define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */ #define USAGE_INIT (1.0) /* including initial planning */ /* ru_inblock block size is 512 bytes with Linux * see http://lkml.indiana.edu/hypermail/linux/kernel/0703.2/0937.html */ #define RUSAGE_BLOCK_SIZE 512 /* Size of a block for getrusage() */ #define TIMEVAL_DIFF(start, end) ((double) end.tv_sec + (double) end.tv_usec / 1000000.0) \ - ((double) start.tv_sec + (double) start.tv_usec / 1000000.0) /* * Extension version number, for supporting older extension versions' objects */ typedef enum pgskVersion { PGSK_V2_0 = 0, PGSK_V2_1 } pgskVersion; static const uint32 PGSK_FILE_HEADER = 0x0d756e10; static struct rusage rusage_start; /* * Current getrusage counters. * * For platform without getrusage support, we rely on postgres implementation * defined in rusagestub.h, which only supports user and system time. * * Note that the following counters are not maintained on GNU/Linux: * - ru_nswap * - ru_msgsnd * - ru_msgrcv * - ru_nsignals */ typedef struct pgskCounters { double usage; /* usage factor */ /* These fields are always used */ float8 utime; /* CPU user time */ float8 stime; /* CPU system time */ #ifdef HAVE_GETRUSAGE /* These fields are only used for platform with HAVE_GETRUSAGE defined */ int64 minflts; /* page reclaims (soft page faults) */ int64 majflts; /* page faults (hard page faults) */ int64 nswaps; /* page faults (hard page faults) */ int64 reads; /* Physical block reads */ int64 writes; /* Physical block writes */ int64 msgsnds; /* IPC messages sent */ int64 msgrcvs; /* IPC messages received */ int64 nsignals; /* signals received */ int64 nvcsws; /* voluntary context witches */ int64 nivcsws; /* unvoluntary context witches */ #endif } pgskCounters; static int pgsk_max = 0; /* max #queries to store. pg_stat_statements.max is used */ /* * Hashtable key that defines the identity of a hashtable entry. We use the * same hash as pg_stat_statements */ typedef struct pgskHashKey { Oid userid; /* user OID */ Oid dbid; /* database OID */ pgsk_queryid queryid; /* query identifier */ } pgskHashKey; /* * Statistics per database */ typedef struct pgskEntry { pgskHashKey key; /* hash key of entry - MUST BE FIRST */ pgskCounters counters; /* statistics for this query */ slock_t mutex; /* protects the counters only */ } pgskEntry; /* * Global shared state */ typedef struct pgskSharedState { LWLock *lock; /* protects hashtable search/modification */ #if PG_VERSION_NUM >= 90600 LWLock *queryids_lock; /* protects queryids array */ pgsk_queryid queryids[FLEXIBLE_ARRAY_MEMBER]; /* queryid info for parallel leaders */ #endif } pgskSharedState; /* saved hook address in case of unload */ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static ExecutorStart_hook_type prev_ExecutorStart = NULL; static ExecutorEnd_hook_type prev_ExecutorEnd = NULL; /* Links to shared memory state */ static pgskSharedState *pgsk = NULL; static HTAB *pgsk_hash = NULL; /*--- Functions --- */ void _PG_init(void); void _PG_fini(void); extern PGDLLEXPORT Datum pg_stat_kcache_reset(PG_FUNCTION_ARGS); extern PGDLLEXPORT Datum pg_stat_kcache(PG_FUNCTION_ARGS); extern PGDLLEXPORT Datum pg_stat_kcache_2_1(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(pg_stat_kcache_reset); PG_FUNCTION_INFO_V1(pg_stat_kcache); PG_FUNCTION_INFO_V1(pg_stat_kcache_2_1); static void pg_stat_kcache_internal(FunctionCallInfo fcinfo, pgskVersion api_version); static void pgsk_setmax(void); static Size pgsk_memsize(void); static void pgsk_shmem_startup(void); static void pgsk_shmem_shutdown(int code, Datum arg); static void pgsk_ExecutorStart(QueryDesc *queryDesc, int eflags); static void pgsk_ExecutorEnd(QueryDesc *queryDesc); static pgskEntry *pgsk_entry_alloc(pgskHashKey *key); static void pgsk_entry_dealloc(void); static void pgsk_entry_reset(void); static void pgsk_entry_store(pgsk_queryid queryId, pgskCounters counters); static uint32 pgsk_hash_fn(const void *key, Size keysize); static int pgsk_match_fn(const void *key1, const void *key2, Size keysize); static bool pgsk_assign_linux_hz_check_hook(int *newval, void **extra, GucSource source); #if PG_VERSION_NUM >= 90600 static Size pgsk_queryids_array_size(void); static void pgsk_set_queryid(pgsk_queryid queryid); #endif static int pgsk_linux_hz; void _PG_init(void) { if (!process_shared_preload_libraries_in_progress) { elog(ERROR, "This module can only be loaded via shared_preload_libraries"); return; } DefineCustomIntVariable("pg_stat_kcache.linux_hz", "Inform pg_stat_kcache of the linux CONFIG_HZ config option", "This is used by pg_stat_kcache to compensate for sampling errors " "in getrusage due to the kernel adhering to its ticks. The default value, -1, " "tries to guess it at startup. ", &pgsk_linux_hz, -1, -1, INT_MAX, PGC_USERSET, 0, pgsk_assign_linux_hz_check_hook, NULL, NULL); EmitWarningsOnPlaceholders("pg_stat_kcache"); /* set pgsk_max if needed */ pgsk_setmax(); RequestAddinShmemSpace(pgsk_memsize()); #if PG_VERSION_NUM >= 90600 RequestNamedLWLockTranche("pg_stat_kcache", 2); #else RequestAddinLWLocks(1); #endif /* Install hook */ prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = pgsk_shmem_startup; prev_ExecutorStart = ExecutorStart_hook; ExecutorStart_hook = pgsk_ExecutorStart; prev_ExecutorEnd = ExecutorEnd_hook; ExecutorEnd_hook = pgsk_ExecutorEnd; } void _PG_fini(void) { /* uninstall hook */ ExecutorStart_hook = prev_ExecutorStart; ExecutorEnd_hook = prev_ExecutorEnd; shmem_startup_hook = prev_shmem_startup_hook; } static bool pgsk_assign_linux_hz_check_hook(int *newval, void **extra, GucSource source) { int val = *newval; struct rusage myrusage; struct timeval previous_value; /* In that case we try to guess it */ if (val == -1) { elog(LOG, "Auto detecting pg_stat_kcache.linux_hz parameter..."); getrusage(RUSAGE_SELF, &myrusage); previous_value = myrusage.ru_utime; while (myrusage.ru_utime.tv_usec == previous_value.tv_usec && myrusage.ru_utime.tv_sec == previous_value.tv_sec) { getrusage(RUSAGE_SELF, &myrusage); } *newval = (int) (1 / ((myrusage.ru_utime.tv_sec - previous_value.tv_sec) + (myrusage.ru_utime.tv_usec - previous_value.tv_usec) / 1000000.)); elog(LOG, "pg_stat_kcache.linux_hz is set to %d", *newval); } return true; } #if PG_VERSION_NUM >= 90600 static void pgsk_set_queryid(pgsk_queryid queryid) { /* Only the leader knows the queryid. */ Assert(!IsParallelWorker()); LWLockAcquire(pgsk->queryids_lock, LW_EXCLUSIVE); pgsk->queryids[MyBackendId] = queryid; LWLockRelease(pgsk->queryids_lock); } #endif static void pgsk_shmem_startup(void) { bool found; HASHCTL info; FILE *file; int i; uint32 header; int32 num; pgskEntry *buffer = NULL; if (prev_shmem_startup_hook) prev_shmem_startup_hook(); /* reset in case this is a restart within the postmaster */ pgsk = NULL; /* Create or attach to the shared memory state */ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); /* global access lock */ pgsk = ShmemInitStruct("pg_stat_kcache", sizeof(pgskSharedState), &found); if (!found) { /* First time through ... */ #if PG_VERSION_NUM >= 90600 LWLockPadded *locks = GetNamedLWLockTranche("pg_stat_kcache"); pgsk->lock = &(locks[0]).lock; pgsk->queryids_lock = &(locks[1]).lock; #else pgsk->lock = LWLockAssign(); #endif } /* set pgsk_max if needed */ pgsk_setmax(); memset(&info, 0, sizeof(info)); info.keysize = sizeof(pgskHashKey); info.entrysize = sizeof(pgskEntry); info.hash = pgsk_hash_fn; info.match = pgsk_match_fn; /* allocate stats shared memory hash */ pgsk_hash = ShmemInitHash("pg_stat_kcache hash", pgsk_max, pgsk_max, &info, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE); LWLockRelease(AddinShmemInitLock); if (!IsUnderPostmaster) on_shmem_exit(pgsk_shmem_shutdown, (Datum) 0); /* * Done if some other process already completed our initialization. */ if (found) return; /* Load stat file, don't care about locking */ file = AllocateFile(PGSK_DUMP_FILE, PG_BINARY_R); if (file == NULL) { if (errno == ENOENT) return; /* ignore not-found error */ goto error; } /* check is header is valid */ if (fread(&header, sizeof(uint32), 1, file) != 1 || header != PGSK_FILE_HEADER) goto error; /* get number of entries */ if (fread(&num, sizeof(int32), 1, file) != 1) goto error; for (i = 0; i < num ; i++) { pgskEntry temp; pgskEntry *entry; if (fread(&temp, sizeof(pgskEntry), 1, file) != 1) goto error; entry = pgsk_entry_alloc(&temp.key); /* copy in the actual stats */ entry->counters = temp.counters; /* don't initialize spinlock, already done */ } FreeFile(file); /* * Remove the file so it's not included in backups/replication slaves, * etc. A new file will be written on next shutdown. */ unlink(PGSK_DUMP_FILE); return; error: ereport(LOG, (errcode_for_file_access(), errmsg("could not read pg_stat_kcache file \"%s\": %m", PGSK_DUMP_FILE))); if (buffer) pfree(buffer); if (file) FreeFile(file); /* delete bogus file, don't care of errors in this case */ unlink(PGSK_DUMP_FILE); } /* * shmem_shutdown hook: dump statistics into file. * */ static void pgsk_shmem_shutdown(int code, Datum arg) { FILE *file; HASH_SEQ_STATUS hash_seq; int32 num_entries; pgskEntry *entry; /* Don't try to dump during a crash. */ if (code) return; if (!pgsk) return; file = AllocateFile(PGSK_DUMP_FILE ".tmp", PG_BINARY_W); if (file == NULL) goto error; if (fwrite(&PGSK_FILE_HEADER, sizeof(uint32), 1, file) != 1) goto error; num_entries = hash_get_num_entries(pgsk_hash); if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) goto error; hash_seq_init(&hash_seq, pgsk_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { if (fwrite(entry, sizeof(pgskEntry), 1, file) != 1) { /* note: we assume hash_seq_term won't change errno */ hash_seq_term(&hash_seq); goto error; } } if (FreeFile(file)) { file = NULL; goto error; } /* * Rename file inplace */ if (rename(PGSK_DUMP_FILE ".tmp", PGSK_DUMP_FILE) != 0) ereport(LOG, (errcode_for_file_access(), errmsg("could not rename pg_stat_kcache file \"%s\": %m", PGSK_DUMP_FILE ".tmp"))); return; error: ereport(LOG, (errcode_for_file_access(), errmsg("could not read pg_stat_kcache file \"%s\": %m", PGSK_DUMP_FILE))); if (file) FreeFile(file); unlink(PGSK_DUMP_FILE); } /* * Retrieve pg_stat_statement.max GUC value and store it into pgsk_max, since * we want to store the same number of entries as pg_stat_statements. Don't do * anything if pgsk_max is already set. */ static void pgsk_setmax(void) { const char *pgss_max; const char *name = "pg_stat_statements.max"; if (pgsk_max != 0) return; pgss_max = GetConfigOption(name, true, false); /* * Retrieving pg_stat_statements.max can fail if pgss is loaded after pgsk * in shared_preload_libraries. Hint user in case this happens. */ if (!pgss_max) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("unrecognized configuration parameter \"%s\"", name), errhint("make sure pg_stat_statements is loaded,\n" "and make sure pg_stat_kcache is present after pg_stat_statements" " in the shared_preload_libraries setting"))); pgsk_max = atoi(pgss_max); } static Size pgsk_memsize(void) { Size size; Assert(pgsk_max != 0); size = MAXALIGN(sizeof(pgskSharedState)); size = add_size(size, hash_estimate_size(pgsk_max, sizeof(pgskEntry))); #if PG_VERSION_NUM >= 90600 size = add_size(size, MAXALIGN(pgsk_queryids_array_size())); #endif return size; } #if PG_VERSION_NUM >= 90600 static Size pgsk_queryids_array_size(void) { /* * queryid isn't pushed to parallel workers. We store them in shared mem * for each query, identified by their BackendId. If need room for all * possible backends, plus autovacuum launcher and workers, plus bg workers * and an extra one since BackendId numerotation starts at 1. */ return (sizeof(bool) * (MaxConnections + autovacuum_max_workers + 1 + max_worker_processes + 1)); } #endif /* * support functions */ static void pgsk_entry_store(pgsk_queryid queryId, pgskCounters counters) { volatile pgskEntry *e; pgskHashKey key; pgskEntry *entry; /* Safety check... */ if (!pgsk || !pgsk_hash) return; /* Set up key for hashtable search */ key.userid = GetUserId(); key.dbid = MyDatabaseId; key.queryid = queryId; /* Lookup the hash table entry with shared lock. */ LWLockAcquire(pgsk->lock, LW_SHARED); entry = (pgskEntry *) hash_search(pgsk_hash, &key, HASH_FIND, NULL); /* Create new entry, if not present */ if (!entry) { /* Need exclusive lock to make a new hashtable entry - promote */ LWLockRelease(pgsk->lock); LWLockAcquire(pgsk->lock, LW_EXCLUSIVE); /* OK to create a new hashtable entry */ entry = pgsk_entry_alloc(&key); } /* * Grab the spinlock while updating the counters (see comment about * locking rules at the head of the file) */ e = (volatile pgskEntry *) entry; SpinLockAcquire(&e->mutex); e->counters.usage += USAGE_INCREASE; e->counters.utime += counters.utime; e->counters.stime += counters.stime; #ifdef HAVE_GETRUSAGE e->counters.minflts += counters.minflts; e->counters.majflts += counters.majflts; e->counters.nswaps += counters.nswaps; e->counters.reads += counters.reads; e->counters.writes += counters.writes; e->counters.msgsnds += counters.msgsnds; e->counters.msgrcvs += counters.msgrcvs; e->counters.nsignals += counters.nsignals; e->counters.nvcsws += counters.nvcsws; e->counters.nivcsws += counters.nivcsws; #endif SpinLockRelease(&e->mutex); LWLockRelease(pgsk->lock); } /* * Allocate a new hashtable entry. * caller must hold an exclusive lock on pgsk->lock */ static pgskEntry *pgsk_entry_alloc(pgskHashKey *key) { pgskEntry *entry; bool found; /* Make space if needed */ while (hash_get_num_entries(pgsk_hash) >= pgsk_max) pgsk_entry_dealloc(); /* Find or create an entry with desired hash code */ entry = (pgskEntry *) hash_search(pgsk_hash, key, HASH_ENTER, &found); if (!found) { /* New entry, initialize it */ /* reset the statistics */ memset(&entry->counters, 0, sizeof(pgskCounters)); /* set the appropriate initial usage count */ entry->counters.usage = USAGE_INIT; /* re-initialize the mutex each time ... we assume no one using it */ SpinLockInit(&entry->mutex); } return entry; } /* * qsort comparator for sorting into increasing usage order */ static int entry_cmp(const void *lhs, const void *rhs) { double l_usage = (*(pgskEntry *const *) lhs)->counters.usage; double r_usage = (*(pgskEntry *const *) rhs)->counters.usage; if (l_usage < r_usage) return -1; else if (l_usage > r_usage) return +1; else return 0; } /* * Deallocate least used entries. * Caller must hold an exclusive lock on pgsk->lock. */ static void pgsk_entry_dealloc(void) { HASH_SEQ_STATUS hash_seq; pgskEntry **entries; pgskEntry *entry; int nvictims; int i; /* * Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them. * While we're scanning the table, apply the decay factor to the usage * values. */ entries = palloc(hash_get_num_entries(pgsk_hash) * sizeof(pgskEntry *)); i = 0; hash_seq_init(&hash_seq, pgsk_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { entries[i++] = entry; entry->counters.usage *= USAGE_DECREASE_FACTOR; } qsort(entries, i, sizeof(pgskEntry *), entry_cmp); nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100); nvictims = Min(nvictims, i); for (i = 0; i < nvictims; i++) { hash_search(pgsk_hash, &entries[i]->key, HASH_REMOVE, NULL); } pfree(entries); } static void pgsk_entry_reset(void) { HASH_SEQ_STATUS hash_seq; pgskEntry *entry; LWLockAcquire(pgsk->lock, LW_EXCLUSIVE); hash_seq_init(&hash_seq, pgsk_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { hash_search(pgsk_hash, &entry->key, HASH_REMOVE, NULL); } LWLockRelease(pgsk->lock); } /* * Hooks */ static void pgsk_ExecutorStart (QueryDesc *queryDesc, int eflags) { /* capture kernel usage stats in rusage_start */ getrusage(RUSAGE_SELF, &rusage_start); #if PG_VERSION_NUM >= 90600 /* Save the queryid so parallel worker can retrieve it */ if (!IsParallelWorker()) { pgsk_set_queryid(queryDesc->plannedstmt->queryId); } #endif /* give control back to PostgreSQL */ if (prev_ExecutorStart) prev_ExecutorStart(queryDesc, eflags); else standard_ExecutorStart(queryDesc, eflags); } static void pgsk_ExecutorEnd (QueryDesc *queryDesc) { pgsk_queryid queryId; struct rusage rusage_end; pgskCounters counters; /* capture kernel usage stats in rusage_end */ getrusage(RUSAGE_SELF, &rusage_end); #if PG_VERSION_NUM >= 90600 if (IsParallelWorker()) { LWLockAcquire(pgsk->queryids_lock, LW_SHARED); queryId = pgsk->queryids[ParallelMasterBackendId]; LWLockRelease(pgsk->queryids_lock); } else #endif queryId = queryDesc->plannedstmt->queryId; /* Compute CPU time delta */ counters.utime = TIMEVAL_DIFF(rusage_start.ru_utime, rusage_end.ru_utime); counters.stime = TIMEVAL_DIFF(rusage_start.ru_stime, rusage_end.ru_stime); if (queryDesc->totaltime) { /* Make sure stats accumulation is done */ InstrEndLoop(queryDesc->totaltime); /* * We only consider values greater than 3 * linux tick, otherwise the * bias is too big */ if (queryDesc->totaltime->total < (3. / pgsk_linux_hz)) { counters.stime = 0; counters.utime = queryDesc->totaltime->total; } } #ifdef HAVE_GETRUSAGE /* Compute the rest of the counters */ counters.minflts = rusage_end.ru_minflt - rusage_start.ru_minflt; counters.majflts = rusage_end.ru_majflt - rusage_start.ru_majflt; counters.nswaps = rusage_end.ru_nswap - rusage_start.ru_nswap; counters.reads = rusage_end.ru_inblock - rusage_start.ru_inblock; counters.writes = rusage_end.ru_oublock - rusage_start.ru_oublock; counters.msgsnds = rusage_end.ru_msgsnd - rusage_start.ru_msgsnd; counters.msgrcvs = rusage_end.ru_msgrcv - rusage_start.ru_msgrcv; counters.nsignals = rusage_end.ru_nsignals - rusage_start.ru_nsignals; counters.nvcsws = rusage_end.ru_nvcsw - rusage_start.ru_nvcsw; counters.nivcsws = rusage_end.ru_nivcsw - rusage_start.ru_nivcsw; #endif /* store current number of block reads and writes */ pgsk_entry_store(queryId, counters); /* give control back to PostgreSQL */ if (prev_ExecutorEnd) prev_ExecutorEnd(queryDesc); else standard_ExecutorEnd(queryDesc); } /* * Calculate hash value for a key */ static uint32 pgsk_hash_fn(const void *key, Size keysize) { const pgskHashKey *k = (const pgskHashKey *) key; return hash_uint32((uint32) k->userid) ^ hash_uint32((uint32) k->dbid) ^ hash_uint32((uint32) k->queryid); } /* * Compare two keys - zero means match */ static int pgsk_match_fn(const void *key1, const void *key2, Size keysize) { const pgskHashKey *k1 = (const pgskHashKey *) key1; const pgskHashKey *k2 = (const pgskHashKey *) key2; if (k1->userid == k2->userid && k1->dbid == k2->dbid && k1->queryid == k2->queryid) return 0; else return 1; } /* * Reset statistics. */ PGDLLEXPORT Datum pg_stat_kcache_reset(PG_FUNCTION_ARGS) { if (!pgsk) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_kcache must be loaded via shared_preload_libraries"))); pgsk_entry_reset(); PG_RETURN_VOID(); } PGDLLEXPORT Datum pg_stat_kcache(PG_FUNCTION_ARGS) { pg_stat_kcache_internal(fcinfo, PGSK_V2_0); return (Datum) 0; } PGDLLEXPORT Datum pg_stat_kcache_2_1(PG_FUNCTION_ARGS) { pg_stat_kcache_internal(fcinfo, PGSK_V2_1); return (Datum) 0; } static void pg_stat_kcache_internal(FunctionCallInfo fcinfo, pgskVersion api_version) { ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; MemoryContext per_query_ctx; MemoryContext oldcontext; TupleDesc tupdesc; Tuplestorestate *tupstore; HASH_SEQ_STATUS hash_seq; pgskEntry *entry; if (!pgsk) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_kcache must be loaded via shared_preload_libraries"))); /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("set-valued function called in context that cannot accept a set"))); if (!(rsinfo->allowedModes & SFRM_Materialize)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("materialize mode required, but it is not " \ "allowed in this context"))); /* Switch into long-lived context to construct returned data structures */ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); /* Build a tuple descriptor for our result type */ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); tupstore = tuplestore_begin_heap(true, false, work_mem); rsinfo->returnMode = SFRM_Materialize; rsinfo->setResult = tupstore; rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); LWLockAcquire(pgsk->lock, LW_SHARED); hash_seq_init(&hash_seq, pgsk_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { Datum values[PG_STAT_KCACHE_COLS]; bool nulls[PG_STAT_KCACHE_COLS]; pgskCounters tmp; int i = 0; #ifdef HAVE_GETRUSAGE int64 reads, writes; #endif memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); /* copy counters to a local variable to keep locking time short */ { volatile pgskEntry *e = (volatile pgskEntry *) entry; SpinLockAcquire(&e->mutex); tmp = e->counters; SpinLockRelease(&e->mutex); } values[i++] = Int64GetDatum(entry->key.queryid); values[i++] = ObjectIdGetDatum(entry->key.userid); values[i++] = ObjectIdGetDatum(entry->key.dbid); #ifdef HAVE_GETRUSAGE reads = tmp.reads * RUSAGE_BLOCK_SIZE; writes = tmp.writes * RUSAGE_BLOCK_SIZE; values[i++] = Int64GetDatumFast(reads); values[i++] = Int64GetDatumFast(writes); #else nulls[i++] = true; /* reads */ nulls[i++] = true; /* writes */ #endif values[i++] = Float8GetDatumFast(tmp.utime); values[i++] = Float8GetDatumFast(tmp.stime); if (api_version >= PGSK_V2_1) { #ifdef HAVE_GETRUSAGE values[i++] = Int64GetDatumFast(tmp.minflts); values[i++] = Int64GetDatumFast(tmp.majflts); values[i++] = Int64GetDatumFast(tmp.nswaps); values[i++] = Int64GetDatumFast(tmp.msgsnds); values[i++] = Int64GetDatumFast(tmp.msgrcvs); values[i++] = Int64GetDatumFast(tmp.nsignals); values[i++] = Int64GetDatumFast(tmp.nvcsws); values[i++] = Int64GetDatumFast(tmp.nivcsws); #else nulls[i++] = true; /* minflts */ nulls[i++] = true; /* majflts */ nulls[i++] = true; /* nswaps */ nulls[i++] = true; /* msgsnds */ nulls[i++] = true; /* msgrcvs */ nulls[i++] = true; /* nsignals */ nulls[i++] = true; /* nvcsws */ nulls[i++] = true; /* nivcsws */ #endif } Assert(i == (api_version == PGSK_V2_0 ? PG_STAT_KCACHE_COLS_V2_0 : api_version == PGSK_V2_1 ? PG_STAT_KCACHE_COLS_V2_1 : -1 /* fail if you forget to update this assert */ )); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(pgsk->lock); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); }