/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "postgres.h" #include #include #include "catalog/pg_type.h" #include "pgstat.h" #include "utils/builtins.h" #include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "storage/ipc.h" #include "utils/typcache.h" #include "utils/wait_event.h" #include "collection.h" #include "collection_config.h" #ifdef PG_MODULE_MAGIC_EXT PG_MODULE_MAGIC_EXT(.name = EXT_NAME,.version = EXT_VERSION); #else PG_MODULE_MAGIC; #endif static Datum expand_collection(Datum collectiondatum, MemoryContext parentcontext); /* * expand_collection * Expand a collection datum into a writable expanded object * Preserves iterator position when copying from another expanded object * * NOTE: This function parallels expand_icollection() in icollection.c. * Any changes to the logic here should be mirrored there for consistency. * The two implementations differ only in key type (char* vs int64) and * hash table operations. */ static Datum expand_collection(Datum collectiondatum, MemoryContext parentcontext) { /* If the source is an expanded collection, copy it to make it writable */ if (VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(collectiondatum))) { CollectionHeader *src_colhdr = (CollectionHeader *) DatumGetEOHP(collectiondatum); CollectionHeader *copyhdr; MemoryContext oldcxt; collection *iter; collection *item; Assert(src_colhdr->collection_magic == COLLECTION_MAGIC); /* Create a writable copy */ copyhdr = construct_empty_collection(parentcontext); if (src_colhdr->head) { oldcxt = MemoryContextSwitchTo(copyhdr->hdr.eoh_context); copyhdr->value_type = src_colhdr->value_type; copyhdr->value_type_len = src_colhdr->value_type_len; copyhdr->value_byval = src_colhdr->value_byval; /* Copy items in the same order to preserve iteration */ for (iter = src_colhdr->head; iter != NULL; iter = iter->hh.next) { char *key; int key_len = strlen(iter->key); collection *replaced_item; key = palloc(key_len + 1); memset(key, 0, key_len + 1); strcpy(key, iter->key); item = palloc(sizeof(collection)); item->key = key; item->isnull = iter->isnull; if (!iter->isnull) item->value = datumCopy(iter->value, src_colhdr->value_byval, src_colhdr->value_type_len); HASH_REPLACE(hh, copyhdr->head, key[0], key_len, item, replaced_item); if (replaced_item) { if (replaced_item->key) pfree(replaced_item->key); if (replaced_item->isnull == false && replaced_item->value) pfree(DatumGetPointer(replaced_item->value)); pfree(replaced_item); } /* Set current to match the source's current position exactly */ if (iter == src_colhdr->current) copyhdr->current = item; } /* If source current was NULL, keep it NULL */ if (src_colhdr->current == NULL) copyhdr->current = NULL; /* If we didn't find a matching current, default to first */ else if (copyhdr->current == NULL && copyhdr->head) copyhdr->current = copyhdr->head; MemoryContextSwitchTo(oldcxt); } else { copyhdr->value_type = src_colhdr->value_type; copyhdr->value_type_len = src_colhdr->value_type_len; copyhdr->value_byval = src_colhdr->value_byval; } return EOHPGetRWDatum(©hdr->hdr); } /* For flat collections, expand them properly */ { CollectionHeader *new_colhdr; FlatCollectionType *fc; MemoryContext oldcxt; int location = 0; int i = 0; struct varlena *attr = NULL; collection *item; pgstat_report_wait_start(collection_we_expand); /* Check whether toasted or not */ if (VARATT_IS_EXTENDED(DatumGetPointer(collectiondatum))) { attr = PG_DETOAST_DATUM_COPY(collectiondatum); fc = (FlatCollectionType *) attr; } else fc = (FlatCollectionType *) (DatumGetPointer(collectiondatum)); /* Validate that the type exists */ lookup_type_cache(fc->value_type, 0); new_colhdr = construct_empty_collection(parentcontext); oldcxt = MemoryContextSwitchTo(new_colhdr->hdr.eoh_context); new_colhdr->value_type = fc->value_type; get_typlenbyval(fc->value_type, &new_colhdr->value_type_len, &new_colhdr->value_byval); while (i < fc->num_entries) { int16 key_len; size_t value_len; char *key; memcpy(&key_len, fc->values + location, sizeof(int16)); location += sizeof(int16); memcpy(&value_len, fc->values + location, sizeof(size_t)); location += sizeof(size_t); key = palloc(key_len + 1); memcpy(key, fc->values + location, key_len); key[key_len] = '\0'; location += key_len; item = (collection *) palloc(sizeof(collection)); item->key = key; if (value_len == 0) item->isnull = true; else { item->isnull = false; if (new_colhdr->value_type_len != -1) { /* Fixed-length type: read directly from buffer */ Datum temp_value; memcpy(&temp_value, fc->values + location, value_len); item->value = datumCopy(temp_value, new_colhdr->value_byval, value_len); } else { /* * Variable-length type: pass pointer directly to * datumCopy */ item->value = datumCopy((Datum) (fc->values + location), new_colhdr->value_byval, value_len); } } location += value_len; HASH_ADD_KEYPTR(hh, new_colhdr->head, key, strlen(key), item); if (!new_colhdr->current) new_colhdr->current = new_colhdr->head; i++; } MemoryContextSwitchTo(oldcxt); if (attr) pfree(attr); pgstat_report_wait_end(); return EOHPGetRWDatum(&new_colhdr->hdr); } } static const ExpandedObjectMethods collection_expand_methods = { collection_get_flat_size, collection_flatten_into }; void _PG_init(void); void _PG_init(void) { /* first time, allocate or get the custom wait event */ #if (PG_VERSION_NUM >= 170000) collection_we_flatsize = WaitEventExtensionNew("CollectionCalculatingFlatSize"); collection_we_flatten = WaitEventExtensionNew("CollectionFlatten"); collection_we_expand = WaitEventExtensionNew("CollectionExpand"); collection_we_cast = WaitEventExtensionNew("CollectionCast"); collection_we_add = WaitEventExtensionNew("CollectionAdd"); collection_we_count = WaitEventExtensionNew("CollectionCount"); collection_we_find = WaitEventExtensionNew("CollectionFind"); collection_we_exist = WaitEventExtensionNew("CollectionExist"); collection_we_delete = WaitEventExtensionNew("CollectionDelete"); collection_we_sort = WaitEventExtensionNew("CollectionSort"); collection_we_copy = WaitEventExtensionNew("CollectionCopy"); collection_we_value = WaitEventExtensionNew("CollectionValue"); collection_we_to_table = WaitEventExtensionNew("CollectionToTable"); collection_we_fetch = WaitEventExtensionNew("CollectionFetch"); collection_we_assign = WaitEventExtensionNew("CollectionAssign"); collection_we_input = WaitEventExtensionNew("CollectionInput"); collection_we_output = WaitEventExtensionNew("CollectionOutput"); #else collection_we_flatsize = PG_WAIT_EXTENSION; collection_we_flatten = PG_WAIT_EXTENSION; collection_we_expand = PG_WAIT_EXTENSION; collection_we_add = PG_WAIT_EXTENSION; collection_we_count = PG_WAIT_EXTENSION; collection_we_find = PG_WAIT_EXTENSION; collection_we_exist = PG_WAIT_EXTENSION; collection_we_delete = PG_WAIT_EXTENSION; collection_we_copy = PG_WAIT_EXTENSION; collection_we_value = PG_WAIT_EXTENSION; collection_we_to_table = PG_WAIT_EXTENSION; collection_we_fetch = PG_WAIT_EXTENSION; collection_we_assign = PG_WAIT_EXTENSION; collection_we_input = PG_WAIT_EXTENSION; collection_we_output = PG_WAIT_EXTENSION; #endif } Size collection_get_flat_size(ExpandedObjectHeader *eohptr) { CollectionHeader *colhdr = (CollectionHeader *) eohptr; collection *cur; size_t sz = 0; Assert(colhdr->collection_magic == COLLECTION_MAGIC); pgstat_report_wait_start(collection_we_flatsize); for (cur = colhdr->head; cur != NULL; cur = cur->hh.next) { sz += strlen(cur->key); if (!cur->isnull) { if (colhdr->value_type_len != -1) sz += colhdr->value_type_len; else { struct varlena *s = (struct varlena *) DatumGetPointer(cur->value); sz += (Size) VARSIZE_ANY(s); } } sz += sizeof(int16); sz += sizeof(size_t); } sz += sizeof(FlatCollectionType); colhdr->flat_size = sz; pgstat_report_wait_end(); return sz; } void collection_flatten_into(ExpandedObjectHeader *eohptr, void *result, Size allocated_size) { CollectionHeader *colhdr = (CollectionHeader *) eohptr; FlatCollectionType *cresult = (FlatCollectionType *) result; collection *cur; int location = 0; Assert(allocated_size == colhdr->flat_size); pgstat_report_wait_start(collection_we_flatten); memset(cresult, 0, allocated_size); SET_VARSIZE(cresult, allocated_size); cresult->num_entries = HASH_COUNT(colhdr->head); cresult->value_type = colhdr->value_type; for (cur = colhdr->head; cur != NULL; cur = cur->hh.next) { int16 key_len; size_t value_len; bool is_varlena; key_len = strlen(cur->key); memcpy(cresult->values + location, (char *) &key_len, sizeof(key_len)); location += sizeof(key_len); if (cur->isnull) { value_len = 0; } else { if (colhdr->value_type_len != -1) { value_len = colhdr->value_type_len; is_varlena = false; } else { struct varlena *s = (struct varlena *) DatumGetPointer(cur->value); value_len = (size_t) VARSIZE_ANY(s); is_varlena = true; } } memcpy(cresult->values + location, (char *) &value_len, sizeof(value_len)); location += sizeof(value_len); memcpy(cresult->values + location, cur->key, key_len); location += key_len; if (value_len > 0) { if (is_varlena) memcpy((char *) cresult->values + location, (char *) cur->value, value_len); else memcpy((char *) cresult->values + location, (char *) &cur->value, value_len); location += value_len; } } stats.context_switch++; pgstat_report_wait_end(); } CollectionHeader * fetch_collection(FunctionCallInfo fcinfo, int argno) { CollectionHeader *colhdr; if (!PG_ARGISNULL(argno)) colhdr = DatumGetExpandedCollection(PG_GETARG_DATUM(argno)); else colhdr = construct_empty_collection(CurrentMemoryContext); return colhdr; } CollectionHeader * construct_empty_collection(MemoryContext parentcontext) { CollectionHeader *colhdr; MemoryContext objcxt; objcxt = AllocSetContextCreate(parentcontext, "expanded collection", ALLOCSET_START_SMALL_SIZES); /* Set up expanded collection */ colhdr = (CollectionHeader *) MemoryContextAlloc(objcxt, sizeof(CollectionHeader)); EOH_init_header(&colhdr->hdr, &collection_expand_methods, objcxt); colhdr->collection_magic = COLLECTION_MAGIC; colhdr->value_type = InvalidOid; colhdr->value_byval = false; colhdr->flat_size = 0; colhdr->current = NULL; colhdr->head = NULL; return colhdr; } CollectionHeader * DatumGetExpandedCollection(Datum d) { /* * If it's already an expanded collection, verify the magic and make a * writable copy via expand_collection. Otherwise expand from flat form. * expand_collection handles both cases (including detoasting). */ if (VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(d))) { Assert(((CollectionHeader *) DatumGetEOHP(d))->collection_magic == COLLECTION_MAGIC); } d = expand_collection(d, CurrentMemoryContext); return (CollectionHeader *) DatumGetEOHP(d); }