#include "postgres.h" #include #include "ivfflat.h" #include "storage/bufmgr.h" /* * Find the list that minimizes the distance function */ static void FindInsertPage(Relation rel, Datum *values, BlockNumber *insertPage, ListInfo * listInfo) { Buffer cbuf; Page cpage; IvfflatList list; double distance; double minDistance = DBL_MAX; BlockNumber nextblkno = IVFFLAT_HEAD_BLKNO; FmgrInfo *procinfo; Oid collation; OffsetNumber offno; OffsetNumber maxoffno; procinfo = index_getprocinfo(rel, 1, IVFFLAT_DISTANCE_PROC); collation = rel->rd_indcollation[0]; /* Search all list pages */ while (BlockNumberIsValid(nextblkno)) { cbuf = ReadBuffer(rel, nextblkno); LockBuffer(cbuf, BUFFER_LOCK_SHARE); cpage = BufferGetPage(cbuf); maxoffno = PageGetMaxOffsetNumber(cpage); for (offno = FirstOffsetNumber; offno <= maxoffno; offno = OffsetNumberNext(offno)) { list = (IvfflatList) PageGetItem(cpage, PageGetItemId(cpage, offno)); distance = DatumGetFloat8(FunctionCall2Coll(procinfo, collation, values[0], PointerGetDatum(&list->center))); if (distance < minDistance) { *insertPage = list->insertPage; listInfo->blkno = nextblkno; listInfo->offno = offno; minDistance = distance; } } nextblkno = IvfflatPageGetOpaque(cpage)->nextblkno; UnlockReleaseBuffer(cbuf); } } /* * Insert a tuple into the index */ static void InsertTuple(Relation rel, IndexTuple itup, Relation heapRel, Datum *values) { Buffer buf; Page page; GenericXLogState *state; Size itemsz; BlockNumber insertPage = InvalidBlockNumber; ListInfo listInfo; BlockNumber originalInsertPage; /* Find the insert page - sets the page and list info */ FindInsertPage(rel, values, &insertPage, &listInfo); Assert(BlockNumberIsValid(insertPage)); originalInsertPage = insertPage; itemsz = MAXALIGN(IndexTupleSize(itup)); Assert(itemsz <= BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(IvfflatPageOpaqueData))); /* Find a page to insert the item */ for (;;) { buf = ReadBuffer(rel, insertPage); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); state = GenericXLogStart(rel); page = GenericXLogRegisterBuffer(state, buf, 0); if (PageGetFreeSpace(page) >= itemsz) break; insertPage = IvfflatPageGetOpaque(page)->nextblkno; if (BlockNumberIsValid(insertPage)) { /* Move to next page */ GenericXLogAbort(state); UnlockReleaseBuffer(buf); } else { /* Add a new page */ Buffer newbuf = IvfflatNewBuffer(rel, MAIN_FORKNUM); Page newpage = GenericXLogRegisterBuffer(state, buf, GENERIC_XLOG_FULL_IMAGE); insertPage = BufferGetBlockNumber(newbuf); /* Update previous buffer */ IvfflatPageGetOpaque(page)->nextblkno = insertPage; /* Init page */ PageInit(newpage, BufferGetPageSize(newbuf), sizeof(IvfflatPageOpaqueData)); IvfflatPageGetOpaque(newpage)->nextblkno = InvalidBlockNumber; IvfflatPageGetOpaque(newpage)->page_id = IVFFLAT_PAGE_ID; /* Commit */ MarkBufferDirty(buf); MarkBufferDirty(newbuf); GenericXLogFinish(state); /* Unlock */ UnlockReleaseBuffer(buf); UnlockReleaseBuffer(newbuf); } } /* Add to next offset */ if (PageAddItem(page, (Item) itup, itemsz, InvalidOffsetNumber, false, false) == InvalidOffsetNumber) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(rel)); IvfflatCommitBuffer(buf, state); /* Update the insert page */ if (insertPage != originalInsertPage) IvfflatUpdateList(rel, state, listInfo, insertPage, originalInsertPage, InvalidBlockNumber, MAIN_FORKNUM); } /* * Insert a tuple into the index */ bool ivfflatinsert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heap, IndexUniqueCheck checkUnique #if PG_VERSION_NUM >= 140000 ,bool indexUnchanged #endif #if PG_VERSION_NUM >= 100000 ,IndexInfo *indexInfo #endif ) { IndexTuple itup; Datum value; FmgrInfo *normprocinfo; if (isnull[0]) return false; value = values[0]; /* Normalize if needed */ normprocinfo = IvfflatOptionalProcInfo(index, IVFFLAT_NORM_PROC); if (normprocinfo != NULL) { if (!IvfflatNormValue(normprocinfo, index->rd_indcollation[0], &value, NULL)) return false; } itup = index_form_tuple(RelationGetDescr(index), &value, isnull); itup->t_tid = *heap_tid; InsertTuple(index, itup, heap, &value); pfree(itup); /* Clean up if we allocated a new value */ if (value != values[0]) pfree(DatumGetPointer(value)); return false; }