#include "postgres.h" #include #include "access/table.h" #include "access/tableam.h" #include "access/parallel.h" #include "access/xact.h" #include "bitvec.h" #include "catalog/index.h" #include "catalog/pg_operator_d.h" #include "catalog/pg_type_d.h" #include "commands/progress.h" #include "halfvec.h" #include "ivfflat.h" #include "miscadmin.h" #include "optimizer/optimizer.h" #include "storage/bufmgr.h" #include "tcop/tcopprot.h" #include "utils/memutils.h" #include "vector.h" #if PG_VERSION_NUM >= 140000 #include "utils/backend_progress.h" #else #include "pgstat.h" #endif #if PG_VERSION_NUM >= 130000 #define CALLBACK_ITEM_POINTER ItemPointer tid #else #define CALLBACK_ITEM_POINTER HeapTuple hup #endif #if PG_VERSION_NUM >= 140000 #include "utils/backend_status.h" #include "utils/wait_event.h" #endif #define PARALLEL_KEY_IVFFLAT_SHARED UINT64CONST(0xA000000000000001) #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002) #define PARALLEL_KEY_IVFFLAT_CENTERS UINT64CONST(0xA000000000000003) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004) /* * Add sample */ static void AddSample(Datum *values, IvfflatBuildState * buildstate) { VectorArray samples = buildstate->samples; int targsamples = samples->maxlen; /* Detoast once for all calls */ Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); /* * Normalize with KMEANS_NORM_PROC since spherical distance function * expects unit vectors */ if (buildstate->kmeansnormprocinfo != NULL) { if (!IvfflatCheckNorm(buildstate->kmeansnormprocinfo, buildstate->collation, value)) return; value = IvfflatNormValue(buildstate->typeInfo, buildstate->collation, value); } if (samples->length < targsamples) { VectorArraySet(samples, samples->length, DatumGetPointer(value)); samples->length++; } else { if (buildstate->rowstoskip < 0) buildstate->rowstoskip = reservoir_get_next_S(&buildstate->rstate, samples->length, targsamples); if (buildstate->rowstoskip <= 0) { #if PG_VERSION_NUM >= 150000 int k = (int) (targsamples * sampler_random_fract(&buildstate->rstate.randstate)); #else int k = (int) (targsamples * sampler_random_fract(buildstate->rstate.randstate)); #endif Assert(k >= 0 && k < targsamples); VectorArraySet(samples, k, DatumGetPointer(value)); } buildstate->rowstoskip -= 1; } } /* * Callback for sampling */ static void SampleCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, bool *isnull, bool tupleIsAlive, void *state) { IvfflatBuildState *buildstate = (IvfflatBuildState *) state; MemoryContext oldCtx; /* Skip nulls */ if (isnull[0]) return; /* Use memory context since detoast can allocate */ oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); /* Add sample */ AddSample(values, buildstate); /* Reset memory context */ MemoryContextSwitchTo(oldCtx); MemoryContextReset(buildstate->tmpCtx); } /* * Sample rows with same logic as ANALYZE */ static void SampleRows(IvfflatBuildState * buildstate) { int targsamples = buildstate->samples->maxlen; BlockNumber totalblocks = RelationGetNumberOfBlocks(buildstate->heap); buildstate->rowstoskip = -1; BlockSampler_Init(&buildstate->bs, totalblocks, targsamples, RandomInt()); reservoir_init_selection_state(&buildstate->rstate, targsamples); while (BlockSampler_HasMore(&buildstate->bs)) { BlockNumber targblock = BlockSampler_Next(&buildstate->bs); table_index_build_range_scan(buildstate->heap, buildstate->index, buildstate->indexInfo, false, true, false, targblock, 1, SampleCallback, (void *) buildstate, NULL); } } /* * Add tuple to sort */ static void AddTupleToSort(Relation index, ItemPointer tid, Datum *values, IvfflatBuildState * buildstate) { double distance; double minDistance = DBL_MAX; int closestCenter = 0; VectorArray centers = buildstate->centers; TupleTableSlot *slot = buildstate->slot; /* Detoast once for all calls */ Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); /* Normalize if needed */ if (buildstate->normprocinfo != NULL) { if (!IvfflatCheckNorm(buildstate->normprocinfo, buildstate->collation, value)) return; value = IvfflatNormValue(buildstate->typeInfo, buildstate->collation, value); } /* Find the list that minimizes the distance */ for (int i = 0; i < centers->length; i++) { distance = DatumGetFloat8(FunctionCall2Coll(buildstate->procinfo, buildstate->collation, value, PointerGetDatum(VectorArrayGet(centers, i)))); if (distance < minDistance) { minDistance = distance; closestCenter = i; } } #ifdef IVFFLAT_KMEANS_DEBUG buildstate->inertia += minDistance; buildstate->listSums[closestCenter] += minDistance; buildstate->listCounts[closestCenter]++; #endif /* Create a virtual tuple */ ExecClearTuple(slot); slot->tts_values[0] = Int32GetDatum(closestCenter); slot->tts_isnull[0] = false; slot->tts_values[1] = PointerGetDatum(tid); slot->tts_isnull[1] = false; slot->tts_values[2] = value; slot->tts_isnull[2] = false; ExecStoreVirtualTuple(slot); /* * Add tuple to sort * * tuplesort_puttupleslot comment: Input data is always copied; the caller * need not save it. */ tuplesort_puttupleslot(buildstate->sortstate, slot); buildstate->indtuples++; } /* * Callback for table_index_build_scan */ static void BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, bool *isnull, bool tupleIsAlive, void *state) { IvfflatBuildState *buildstate = (IvfflatBuildState *) state; MemoryContext oldCtx; #if PG_VERSION_NUM < 130000 ItemPointer tid = &hup->t_self; #endif /* Skip nulls */ if (isnull[0]) return; /* Use memory context since detoast can allocate */ oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); /* Add tuple to sort */ AddTupleToSort(index, tid, values, buildstate); /* Reset memory context */ MemoryContextSwitchTo(oldCtx); MemoryContextReset(buildstate->tmpCtx); } /* * Get index tuple from sort state */ static inline void GetNextTuple(Tuplesortstate *sortstate, TupleDesc tupdesc, TupleTableSlot *slot, IndexTuple *itup, int *list) { Datum value; bool isnull; if (tuplesort_gettupleslot(sortstate, true, false, slot, NULL)) { *list = DatumGetInt32(slot_getattr(slot, 1, &isnull)); value = slot_getattr(slot, 3, &isnull); /* Form the index tuple */ *itup = index_form_tuple(tupdesc, &value, &isnull); (*itup)->t_tid = *((ItemPointer) DatumGetPointer(slot_getattr(slot, 2, &isnull))); } else *list = -1; } /* * Create initial entry pages */ static void InsertTuples(Relation index, IvfflatBuildState * buildstate, ForkNumber forkNum) { int list; IndexTuple itup = NULL; /* silence compiler warning */ int64 inserted = 0; TupleTableSlot *slot = MakeSingleTupleTableSlot(buildstate->tupdesc, &TTSOpsMinimalTuple); TupleDesc tupdesc = RelationGetDescr(index); pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_LOAD); pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_TOTAL, buildstate->indtuples); GetNextTuple(buildstate->sortstate, tupdesc, slot, &itup, &list); for (int i = 0; i < buildstate->centers->length; i++) { Buffer buf; Page page; GenericXLogState *state; BlockNumber startPage; BlockNumber insertPage; /* Can take a while, so ensure we can interrupt */ /* Needs to be called when no buffer locks are held */ CHECK_FOR_INTERRUPTS(); buf = IvfflatNewBuffer(index, forkNum); IvfflatInitRegisterPage(index, &buf, &page, &state); startPage = BufferGetBlockNumber(buf); /* Get all tuples for list */ while (list == i) { /* Check for free space */ Size itemsz = MAXALIGN(IndexTupleSize(itup)); if (PageGetFreeSpace(page) < itemsz) IvfflatAppendPage(index, &buf, &page, &state, forkNum); /* Add the item */ if (PageAddItem(page, (Item) itup, itemsz, InvalidOffsetNumber, false, false) == InvalidOffsetNumber) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index)); pfree(itup); pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE, ++inserted); GetNextTuple(buildstate->sortstate, tupdesc, slot, &itup, &list); } insertPage = BufferGetBlockNumber(buf); IvfflatCommitBuffer(buf, state); /* Set the start and insert pages */ IvfflatUpdateList(index, buildstate->listInfo[i], insertPage, InvalidBlockNumber, startPage, forkNum); } } /* * Initialize the build state */ static void InitBuildState(IvfflatBuildState * buildstate, Relation heap, Relation index, IndexInfo *indexInfo) { buildstate->heap = heap; buildstate->index = index; buildstate->indexInfo = indexInfo; buildstate->typeInfo = IvfflatGetTypeInfo(index); buildstate->lists = IvfflatGetLists(index); buildstate->dimensions = TupleDescAttr(index->rd_att, 0)->atttypmod; /* Disallow varbit since require fixed dimensions */ if (TupleDescAttr(index->rd_att, 0)->atttypid == VARBITOID) elog(ERROR, "type not supported for ivfflat index"); /* Require column to have dimensions to be indexed */ if (buildstate->dimensions < 0) elog(ERROR, "column does not have dimensions"); if (buildstate->dimensions > buildstate->typeInfo->maxDimensions) elog(ERROR, "column cannot have more than %d dimensions for ivfflat index", buildstate->typeInfo->maxDimensions); buildstate->reltuples = 0; buildstate->indtuples = 0; /* Get support functions */ buildstate->procinfo = index_getprocinfo(index, 1, IVFFLAT_DISTANCE_PROC); buildstate->normprocinfo = IvfflatOptionalProcInfo(index, IVFFLAT_NORM_PROC); buildstate->kmeansnormprocinfo = IvfflatOptionalProcInfo(index, IVFFLAT_KMEANS_NORM_PROC); buildstate->collation = index->rd_indcollation[0]; /* Require more than one dimension for spherical k-means */ if (buildstate->kmeansnormprocinfo != NULL && buildstate->dimensions == 1) elog(ERROR, "dimensions must be greater than one for this opclass"); /* Create tuple description for sorting */ buildstate->tupdesc = CreateTemplateTupleDesc(3); TupleDescInitEntry(buildstate->tupdesc, (AttrNumber) 1, "list", INT4OID, -1, 0); TupleDescInitEntry(buildstate->tupdesc, (AttrNumber) 2, "tid", TIDOID, -1, 0); TupleDescInitEntry(buildstate->tupdesc, (AttrNumber) 3, "vector", RelationGetDescr(index)->attrs[0].atttypid, -1, 0); buildstate->slot = MakeSingleTupleTableSlot(buildstate->tupdesc, &TTSOpsVirtual); buildstate->centers = VectorArrayInit(buildstate->lists, buildstate->dimensions, buildstate->typeInfo->itemSize(buildstate->dimensions)); buildstate->listInfo = palloc(sizeof(ListInfo) * buildstate->lists); buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext, "Ivfflat build temporary context", ALLOCSET_DEFAULT_SIZES); #ifdef IVFFLAT_KMEANS_DEBUG buildstate->inertia = 0; buildstate->listSums = palloc0(sizeof(double) * buildstate->lists); buildstate->listCounts = palloc0(sizeof(int) * buildstate->lists); #endif buildstate->ivfleader = NULL; } /* * Free resources */ static void FreeBuildState(IvfflatBuildState * buildstate) { VectorArrayFree(buildstate->centers); pfree(buildstate->listInfo); #ifdef IVFFLAT_KMEANS_DEBUG pfree(buildstate->listSums); pfree(buildstate->listCounts); #endif MemoryContextDelete(buildstate->tmpCtx); } /* * Compute centers */ static void ComputeCenters(IvfflatBuildState * buildstate) { int numSamples; pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_KMEANS); /* Target 50 samples per list, with at least 10000 samples */ /* The number of samples has a large effect on index build time */ numSamples = buildstate->lists * 50; if (numSamples < 10000) numSamples = 10000; /* Skip samples for unlogged table */ if (buildstate->heap == NULL) numSamples = 1; /* Sample rows */ /* TODO Ensure within maintenance_work_mem */ buildstate->samples = VectorArrayInit(numSamples, buildstate->dimensions, buildstate->centers->itemsize); if (buildstate->heap != NULL) { SampleRows(buildstate); if (buildstate->samples->length < buildstate->lists) { ereport(NOTICE, (errmsg("ivfflat index created with little data"), errdetail("This will cause low recall."), errhint("Drop the index until the table has more data."))); } } /* Calculate centers */ IvfflatBench("k-means", IvfflatKmeans(buildstate->index, buildstate->samples, buildstate->centers, buildstate->typeInfo)); /* Free samples before we allocate more memory */ VectorArrayFree(buildstate->samples); } /* * Create the metapage */ static void CreateMetaPage(Relation index, int dimensions, int lists, ForkNumber forkNum) { Buffer buf; Page page; GenericXLogState *state; IvfflatMetaPage metap; buf = IvfflatNewBuffer(index, forkNum); IvfflatInitRegisterPage(index, &buf, &page, &state); /* Set metapage data */ metap = IvfflatPageGetMeta(page); metap->magicNumber = IVFFLAT_MAGIC_NUMBER; metap->version = IVFFLAT_VERSION; metap->dimensions = dimensions; metap->lists = lists; ((PageHeader) page)->pd_lower = ((char *) metap + sizeof(IvfflatMetaPageData)) - (char *) page; IvfflatCommitBuffer(buf, state); } /* * Create list pages */ static void CreateListPages(Relation index, VectorArray centers, int dimensions, int lists, ForkNumber forkNum, ListInfo * *listInfo) { Buffer buf; Page page; GenericXLogState *state; Size listSize; IvfflatList list; listSize = MAXALIGN(IVFFLAT_LIST_SIZE(centers->itemsize)); list = palloc0(listSize); buf = IvfflatNewBuffer(index, forkNum); IvfflatInitRegisterPage(index, &buf, &page, &state); for (int i = 0; i < lists; i++) { OffsetNumber offno; /* Zero memory for each list */ MemSet(list, 0, listSize); /* Load list */ list->startPage = InvalidBlockNumber; list->insertPage = InvalidBlockNumber; memcpy(&list->center, VectorArrayGet(centers, i), VARSIZE_ANY(VectorArrayGet(centers, i))); /* Ensure free space */ if (PageGetFreeSpace(page) < listSize) IvfflatAppendPage(index, &buf, &page, &state, forkNum); /* Add the item */ offno = PageAddItem(page, (Item) list, listSize, InvalidOffsetNumber, false, false); if (offno == InvalidOffsetNumber) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index)); /* Save location info */ (*listInfo)[i].blkno = BufferGetBlockNumber(buf); (*listInfo)[i].offno = offno; } IvfflatCommitBuffer(buf, state); pfree(list); } #ifdef IVFFLAT_KMEANS_DEBUG /* * Print k-means metrics */ static void PrintKmeansMetrics(IvfflatBuildState * buildstate) { elog(INFO, "inertia: %.3e", buildstate->inertia); /* Calculate Davies-Bouldin index */ if (buildstate->lists > 1 && !buildstate->ivfleader) { double db = 0.0; /* Calculate average distance */ for (int i = 0; i < buildstate->lists; i++) { if (buildstate->listCounts[i] > 0) buildstate->listSums[i] /= buildstate->listCounts[i]; } for (int i = 0; i < buildstate->lists; i++) { double max = 0.0; double distance; for (int j = 0; j < buildstate->lists; j++) { if (j == i) continue; distance = DatumGetFloat8(FunctionCall2Coll(buildstate->procinfo, buildstate->collation, PointerGetDatum(VectorArrayGet(buildstate->centers, i)), PointerGetDatum(VectorArrayGet(buildstate->centers, j)))); distance = (buildstate->listSums[i] + buildstate->listSums[j]) / distance; if (distance > max) max = distance; } db += max; } db /= buildstate->lists; elog(INFO, "davies-bouldin: %.3f", db); } } #endif /* * Within leader, wait for end of heap scan */ static double ParallelHeapScan(IvfflatBuildState * buildstate) { IvfflatShared *ivfshared = buildstate->ivfleader->ivfshared; int nparticipanttuplesorts; double reltuples; nparticipanttuplesorts = buildstate->ivfleader->nparticipanttuplesorts; for (;;) { SpinLockAcquire(&ivfshared->mutex); if (ivfshared->nparticipantsdone == nparticipanttuplesorts) { buildstate->indtuples = ivfshared->indtuples; reltuples = ivfshared->reltuples; #ifdef IVFFLAT_KMEANS_DEBUG buildstate->inertia = ivfshared->inertia; #endif SpinLockRelease(&ivfshared->mutex); break; } SpinLockRelease(&ivfshared->mutex); ConditionVariableSleep(&ivfshared->workersdonecv, WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); } ConditionVariableCancelSleep(); return reltuples; } /* * Perform a worker's portion of a parallel sort */ static void IvfflatParallelScanAndSort(IvfflatSpool * ivfspool, IvfflatShared * ivfshared, Sharedsort *sharedsort, char *ivfcenters, int sortmem, bool progress) { SortCoordinate coordinate; IvfflatBuildState buildstate; TableScanDesc scan; double reltuples; IndexInfo *indexInfo; /* Sort options, which must match AssignTuples */ AttrNumber attNums[] = {1}; Oid sortOperators[] = {Int4LessOperator}; Oid sortCollations[] = {InvalidOid}; bool nullsFirstFlags[] = {false}; /* Initialize local tuplesort coordination state */ coordinate = palloc0(sizeof(SortCoordinateData)); coordinate->isWorker = true; coordinate->nParticipants = -1; coordinate->sharedsort = sharedsort; /* Join parallel scan */ indexInfo = BuildIndexInfo(ivfspool->index); indexInfo->ii_Concurrent = ivfshared->isconcurrent; InitBuildState(&buildstate, ivfspool->heap, ivfspool->index, indexInfo); memcpy(buildstate.centers->items, ivfcenters, buildstate.centers->itemsize * buildstate.centers->maxlen); buildstate.centers->length = buildstate.centers->maxlen; ivfspool->sortstate = tuplesort_begin_heap(buildstate.tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, sortmem, coordinate, false); buildstate.sortstate = ivfspool->sortstate; scan = table_beginscan_parallel(ivfspool->heap, ParallelTableScanFromIvfflatShared(ivfshared)); reltuples = table_index_build_scan(ivfspool->heap, ivfspool->index, indexInfo, true, progress, BuildCallback, (void *) &buildstate, scan); /* Execute this worker's part of the sort */ tuplesort_performsort(ivfspool->sortstate); /* Record statistics */ SpinLockAcquire(&ivfshared->mutex); ivfshared->nparticipantsdone++; ivfshared->reltuples += reltuples; ivfshared->indtuples += buildstate.indtuples; #ifdef IVFFLAT_KMEANS_DEBUG ivfshared->inertia += buildstate.inertia; #endif SpinLockRelease(&ivfshared->mutex); /* Log statistics */ if (progress) ereport(DEBUG1, (errmsg("leader processed " INT64_FORMAT " tuples", (int64) reltuples))); else ereport(DEBUG1, (errmsg("worker processed " INT64_FORMAT " tuples", (int64) reltuples))); /* Notify leader */ ConditionVariableSignal(&ivfshared->workersdonecv); /* We can end tuplesorts immediately */ tuplesort_end(ivfspool->sortstate); FreeBuildState(&buildstate); } /* * Perform work within a launched parallel process */ void IvfflatParallelBuildMain(dsm_segment *seg, shm_toc *toc) { char *sharedquery; IvfflatSpool *ivfspool; IvfflatShared *ivfshared; Sharedsort *sharedsort; char *ivfcenters; Relation heapRel; Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; int sortmem; /* Set debug_query_string for individual workers first */ sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); debug_query_string = sharedquery; /* Report the query string from leader */ pgstat_report_activity(STATE_RUNNING, debug_query_string); /* Look up shared state */ ivfshared = shm_toc_lookup(toc, PARALLEL_KEY_IVFFLAT_SHARED, false); /* Open relations using lock modes known to be obtained by index.c */ if (!ivfshared->isconcurrent) { heapLockmode = ShareLock; indexLockmode = AccessExclusiveLock; } else { heapLockmode = ShareUpdateExclusiveLock; indexLockmode = RowExclusiveLock; } /* Open relations within worker */ heapRel = table_open(ivfshared->heaprelid, heapLockmode); indexRel = index_open(ivfshared->indexrelid, indexLockmode); /* Initialize worker's own spool */ ivfspool = (IvfflatSpool *) palloc0(sizeof(IvfflatSpool)); ivfspool->heap = heapRel; ivfspool->index = indexRel; /* Look up shared state private to tuplesort.c */ sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); tuplesort_attach_shared(sharedsort, seg); ivfcenters = shm_toc_lookup(toc, PARALLEL_KEY_IVFFLAT_CENTERS, false); /* Perform sorting */ sortmem = maintenance_work_mem / ivfshared->scantuplesortstates; IvfflatParallelScanAndSort(ivfspool, ivfshared, sharedsort, ivfcenters, sortmem, false); /* Close relations within worker */ index_close(indexRel, indexLockmode); table_close(heapRel, heapLockmode); } /* * End parallel build */ static void IvfflatEndParallel(IvfflatLeader * ivfleader) { /* Shutdown worker processes */ WaitForParallelWorkersToFinish(ivfleader->pcxt); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(ivfleader->snapshot)) UnregisterSnapshot(ivfleader->snapshot); DestroyParallelContext(ivfleader->pcxt); ExitParallelMode(); } /* * Return size of shared memory required for parallel index build */ static Size ParallelEstimateShared(Relation heap, Snapshot snapshot) { return add_size(BUFFERALIGN(sizeof(IvfflatShared)), table_parallelscan_estimate(heap, snapshot)); } /* * Within leader, participate as a parallel worker */ static void IvfflatLeaderParticipateAsWorker(IvfflatBuildState * buildstate) { IvfflatLeader *ivfleader = buildstate->ivfleader; IvfflatSpool *leaderworker; int sortmem; /* Allocate memory and initialize private spool */ leaderworker = (IvfflatSpool *) palloc0(sizeof(IvfflatSpool)); leaderworker->heap = buildstate->heap; leaderworker->index = buildstate->index; /* Perform work common to all participants */ sortmem = maintenance_work_mem / ivfleader->nparticipanttuplesorts; IvfflatParallelScanAndSort(leaderworker, ivfleader->ivfshared, ivfleader->sharedsort, ivfleader->ivfcenters, sortmem, true); } /* * Begin parallel build */ static void IvfflatBeginParallel(IvfflatBuildState * buildstate, bool isconcurrent, int request) { ParallelContext *pcxt; int scantuplesortstates; Snapshot snapshot; Size estivfshared; Size estsort; Size estcenters; IvfflatShared *ivfshared; Sharedsort *sharedsort; char *ivfcenters; IvfflatLeader *ivfleader = (IvfflatLeader *) palloc0(sizeof(IvfflatLeader)); bool leaderparticipates = true; int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; #endif /* Enter parallel mode and create context */ EnterParallelMode(); Assert(request > 0); pcxt = CreateParallelContext("vector", "IvfflatParallelBuildMain", request); scantuplesortstates = leaderparticipates ? request + 1 : request; /* Get snapshot for table scan */ if (!isconcurrent) snapshot = SnapshotAny; else snapshot = RegisterSnapshot(GetTransactionSnapshot()); /* Estimate size of workspaces */ estivfshared = ParallelEstimateShared(buildstate->heap, snapshot); shm_toc_estimate_chunk(&pcxt->estimator, estivfshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); estcenters = buildstate->centers->itemsize * buildstate->centers->maxlen; shm_toc_estimate_chunk(&pcxt->estimator, estcenters); shm_toc_estimate_keys(&pcxt->estimator, 3); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) { querylen = strlen(debug_query_string); shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); shm_toc_estimate_keys(&pcxt->estimator, 1); } else querylen = 0; /* keep compiler quiet */ /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); /* If no DSM segment was available, back out (do serial build) */ if (pcxt->seg == NULL) { if (IsMVCCSnapshot(snapshot)) UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); return; } /* Store shared build state, for which we reserved space */ ivfshared = (IvfflatShared *) shm_toc_allocate(pcxt->toc, estivfshared); /* Initialize immutable state */ ivfshared->heaprelid = RelationGetRelid(buildstate->heap); ivfshared->indexrelid = RelationGetRelid(buildstate->index); ivfshared->isconcurrent = isconcurrent; ivfshared->scantuplesortstates = scantuplesortstates; ConditionVariableInit(&ivfshared->workersdonecv); SpinLockInit(&ivfshared->mutex); /* Initialize mutable state */ ivfshared->nparticipantsdone = 0; ivfshared->reltuples = 0; ivfshared->indtuples = 0; #ifdef IVFFLAT_KMEANS_DEBUG ivfshared->inertia = 0; #endif table_parallelscan_initialize(buildstate->heap, ParallelTableScanFromIvfflatShared(ivfshared), snapshot); /* Store shared tuplesort-private state, for which we reserved space */ sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); tuplesort_initialize_shared(sharedsort, scantuplesortstates, pcxt->seg); ivfcenters = shm_toc_allocate(pcxt->toc, estcenters); memcpy(ivfcenters, buildstate->centers->items, estcenters); shm_toc_insert(pcxt->toc, PARALLEL_KEY_IVFFLAT_SHARED, ivfshared); shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); shm_toc_insert(pcxt->toc, PARALLEL_KEY_IVFFLAT_CENTERS, ivfcenters); /* Store query string for workers */ if (debug_query_string) { char *sharedquery; sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); memcpy(sharedquery, debug_query_string, querylen + 1); shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); } /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); ivfleader->pcxt = pcxt; ivfleader->nparticipanttuplesorts = pcxt->nworkers_launched; if (leaderparticipates) ivfleader->nparticipanttuplesorts++; ivfleader->ivfshared = ivfshared; ivfleader->sharedsort = sharedsort; ivfleader->snapshot = snapshot; ivfleader->ivfcenters = ivfcenters; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) { IvfflatEndParallel(ivfleader); return; } /* Log participants */ ereport(DEBUG1, (errmsg("using %d parallel workers", pcxt->nworkers_launched))); /* Save leader state now that it's clear build will be parallel */ buildstate->ivfleader = ivfleader; /* Join heap scan ourselves */ if (leaderparticipates) IvfflatLeaderParticipateAsWorker(buildstate); /* Wait for all launched workers */ WaitForParallelWorkersToAttach(pcxt); } /* * Scan table for tuples to index */ static void AssignTuples(IvfflatBuildState * buildstate) { int parallel_workers = 0; SortCoordinate coordinate = NULL; /* Sort options, which must match IvfflatParallelScanAndSort */ AttrNumber attNums[] = {1}; Oid sortOperators[] = {Int4LessOperator}; Oid sortCollations[] = {InvalidOid}; bool nullsFirstFlags[] = {false}; pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_ASSIGN); /* Calculate parallel workers */ if (buildstate->heap != NULL) parallel_workers = plan_create_index_workers(RelationGetRelid(buildstate->heap), RelationGetRelid(buildstate->index)); /* Attempt to launch parallel worker scan when required */ if (parallel_workers > 0) IvfflatBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers); /* Set up coordination state if at least one worker launched */ if (buildstate->ivfleader) { coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); coordinate->isWorker = false; coordinate->nParticipants = buildstate->ivfleader->nparticipanttuplesorts; coordinate->sharedsort = buildstate->ivfleader->sharedsort; } /* Begin serial/leader tuplesort */ buildstate->sortstate = tuplesort_begin_heap(buildstate->tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, maintenance_work_mem, coordinate, false); /* Add tuples to sort */ if (buildstate->heap != NULL) { if (buildstate->ivfleader) buildstate->reltuples = ParallelHeapScan(buildstate); else buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo, true, true, BuildCallback, (void *) buildstate, NULL); #ifdef IVFFLAT_KMEANS_DEBUG PrintKmeansMetrics(buildstate); #endif } } /* * Create entry pages */ static void CreateEntryPages(IvfflatBuildState * buildstate, ForkNumber forkNum) { /* Assign */ IvfflatBench("assign tuples", AssignTuples(buildstate)); /* Sort */ IvfflatBench("sort tuples", tuplesort_performsort(buildstate->sortstate)); /* Load */ IvfflatBench("load tuples", InsertTuples(buildstate->index, buildstate, forkNum)); /* End sort */ tuplesort_end(buildstate->sortstate); /* End parallel build */ if (buildstate->ivfleader) IvfflatEndParallel(buildstate->ivfleader); } /* * Build the index */ static void BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo, IvfflatBuildState * buildstate, ForkNumber forkNum) { InitBuildState(buildstate, heap, index, indexInfo); ComputeCenters(buildstate); /* Create pages */ CreateMetaPage(index, buildstate->dimensions, buildstate->lists, forkNum); CreateListPages(index, buildstate->centers, buildstate->dimensions, buildstate->lists, forkNum, &buildstate->listInfo); CreateEntryPages(buildstate, forkNum); /* Write WAL for initialization fork since GenericXLog functions do not */ if (forkNum == INIT_FORKNUM) log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), true); FreeBuildState(buildstate); } /* * Build the index for a logged table */ IndexBuildResult * ivfflatbuild(Relation heap, Relation index, IndexInfo *indexInfo) { IndexBuildResult *result; IvfflatBuildState buildstate; BuildIndex(heap, index, indexInfo, &buildstate, MAIN_FORKNUM); result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult)); result->heap_tuples = buildstate.reltuples; result->index_tuples = buildstate.indtuples; return result; } /* * Build the index for an unlogged table */ void ivfflatbuildempty(Relation index) { IndexInfo *indexInfo = BuildIndexInfo(index); IvfflatBuildState buildstate; BuildIndex(NULL, index, indexInfo, &buildstate, INIT_FORKNUM); }