Skip to content

Commit

Permalink
add pinecone_print_index helper function
Browse files Browse the repository at this point in the history
  • Loading branch information
oscarlaird committed Feb 29, 2024
1 parent d5930b4 commit bb4a2d7
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 75 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ MODULE_big = vector
DATA = $(wildcard sql/*--*.sql)
OBJS = src/hnsw.o src/hnswbuild.o src/hnswinsert.o src/hnswscan.o src/hnswutils.o src/hnswvacuum.o src/ivfbuild.o src/ivfflat.o src/ivfinsert.o src/ivfkmeans.o src/ivfscan.o src/ivfutils.o src/ivfvacuum.o src/vector.o \
src/pinecone/pinecone_api.o src/pinecone/pinecone.o src/cJSON.o src/pinecone/pinecone_helpers.o src/pinecone/pinecone_build.o src/pinecone/pinecone_insert.o src/pinecone/pinecone_scan.o src/pinecone/pinecone_utils.o src/pinecone/pinecone_vacuum.o src/pinecone/pinecone_validate.o
HEADERS = src/vector.h src/cJSON.h src/pinecone/pinecone.h src/pinecone/pinecone_api.h
HEADERS = src/vector.h

TESTS = $(wildcard test/sql/*.sql)
REGRESS = $(patsubst test/sql/%.sql,%,$(TESTS))
REGRESS_OPTS = --inputdir=test --load-extension=$(EXTENSION)

OPTFLAGS = -march=native
OPTFLAGS = -march=native -O0 -fno-strict-aliasing

# Mac ARM doesn't support -march=native
ifeq ($(shell uname -s), Darwin)
Expand Down
3 changes: 3 additions & 0 deletions sql/vector.sql
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ CREATE FUNCTION pinecone_indexes() RETURNS SETOF pinecone_index_stats
CREATE FUNCTION pinecone_delete_unused_indexes() RETURNS int4
AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE STRICT PARALLEL SAFE;

CREATE FUNCTION pinecone_print_index(text) RETURNS int4
AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE STRICT PARALLEL SAFE;

-- aggregates

CREATE AGGREGATE avg(vector) (
Expand Down
12 changes: 9 additions & 3 deletions src/pinecone/pinecone.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
char* pinecone_api_key = NULL;
int pinecone_top_k = 1000;
int pinecone_vectors_per_request = 100;
int pinecone_concurrent_requests = 20;
int pinecone_requests_per_batch = 20;
int pinecone_max_buffer_scan = 10000; // maximum number of tuples to search in the buffer
int pinecone_max_fetched_vectors_for_liveness_check = 10;

// todo: principled batch sizes. Do we ever want the buffer to be bigger than a multi-insert? Possibly if we want to let the buffer fill up when the remote index is down.
static relopt_kind pinecone_relopt_kind;
Expand Down Expand Up @@ -42,8 +43,8 @@ void PineconeInit(void)
100, 1, 1000,
PGC_USERSET,
0, NULL, NULL, NULL);
DefineCustomIntVariable("pinecone.concurrent_requests", "Pinecone concurrent requests", "Pinecone concurrent requests",
&pinecone_concurrent_requests,
DefineCustomIntVariable("pinecone.requests_per_batch", "Pinecone requests per batch", "Pinecone requests per batch",
&pinecone_requests_per_batch,
20, 1, 100,
PGC_USERSET,
0, NULL, NULL, NULL);
Expand All @@ -52,6 +53,11 @@ void PineconeInit(void)
10000, 1, 100000,
PGC_USERSET,
0, NULL, NULL, NULL);
DefineCustomIntVariable("pinecone.max_fetched_vectors_for_liveness_check", "Pinecone max fetched vectors for liveness check", "Pinecone max fetched vectors for liveness check",
&pinecone_max_fetched_vectors_for_liveness_check,
10, 1, 100, // more than 100 is useless and won't fit in the 2048 chars allotted for the URL
PGC_USERSET,
0, NULL, NULL, NULL);
MarkGUCPrefixReserved("pinecone");
}

Expand Down
20 changes: 11 additions & 9 deletions src/pinecone/pinecone.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
// pinecone specific limits

#define PINECONE_NAME_MAX_LENGTH 45
#define PINECONE_HOST_MAX_LENGTH 100

// structs
typedef struct PineconeScanOpaqueData
Expand All @@ -48,7 +49,6 @@ typedef struct PineconeScanOpaqueData

// support functions
FmgrInfo *procinfo;
Oid collation;

// results
cJSON* pinecone_results;
Expand All @@ -61,8 +61,8 @@ extern const char* vector_metric_to_pinecone_metric[VECTOR_METRIC_COUNT];
typedef struct PineconeStaticMetaPageData
{
int dimensions;
char host[100];
char pinecone_index_name[60];
char host[PINECONE_HOST_MAX_LENGTH + 1];
char pinecone_index_name[PINECONE_NAME_MAX_LENGTH + 1];
VectorMetric metric;
} PineconeStaticMetaPageData;
typedef PineconeStaticMetaPageData *PineconeStaticMetaPage;
Expand Down Expand Up @@ -117,9 +117,10 @@ typedef PineconeBufferOpaqueData *PineconeBufferOpaque;
extern char* pinecone_api_key;
extern int pinecone_top_k;
extern int pinecone_vectors_per_request;
extern int pinecone_concurrent_requests;
extern int pinecone_requests_per_batch;
extern int pinecone_max_buffer_scan;
#define PINECONE_BATCH_SIZE pinecone_vectors_per_request * pinecone_concurrent_requests
extern int pinecone_max_fetched_vectors_for_liveness_check;
#define PINECONE_BATCH_SIZE pinecone_vectors_per_request * pinecone_requests_per_batch

// function declarations

Expand Down Expand Up @@ -155,8 +156,6 @@ bool pinecone_insert(Relation index, Datum *values, bool *isnull, ItemPointer he
#endif
IndexInfo *indexInfo);
void FlushToPinecone(Relation index);
cJSON* get_fetch_ids(PineconeBufferMetaPageData buffer_meta);
void AdvanceLivenessTail(Relation index, cJSON* fetched_ids);

// scan
IndexScanDesc pinecone_beginscan(Relation index, int nkeys, int norderbys);
Expand Down Expand Up @@ -189,15 +188,18 @@ cJSON* index_tuple_get_pinecone_vector(Relation index, IndexTuple itup);
char* pinecone_id_from_heap_tid(ItemPointerData heap_tid);
ItemPointerData pinecone_id_get_heap_tid(char *id);
// read and write meta pages
PineconeStaticMetaPageData GetStaticMetaPageData(Relation index);
PineconeStaticMetaPageData PineconeSnapshotStaticMeta(Relation index);
PineconeBufferMetaPageData PineconeSnapshotBufferMeta(Relation index);
PineconeBufferOpaqueData PineconeSnapshotBufferOpaque(Relation index, BlockNumber blkno);
void set_buffer_meta_page(Relation index, PineconeCheckpoint* ready_checkpoint, PineconeCheckpoint* flush_checkpoint, PineconeCheckpoint* latest_checkpoint, BlockNumber* insert_page, int* n_tuples_since_last_checkpoint);
char* checkpoint_to_string(PineconeCheckpoint checkpoint);
char* buffer_meta_to_string(PineconeBufferMetaPageData buffer_meta);
char* buffer_opaque_to_string(PineconeBufferOpaqueData buffer_opaque);
void print_relation(Relation index);
void pinecone_print_relation(Relation index);

// helpers
Oid get_index_oid_from_name(char* index_name);


// misc.

Expand Down
2 changes: 1 addition & 1 deletion src/pinecone/pinecone_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ CURL* get_pinecone_upsert_handle(const char *api_key, const char *index_host, cJ

CURL* get_pinecone_fetch_handle(const char *api_key, const char *index_host, cJSON* ids, ResponseData* response_data) {
CURL* hnd = curl_easy_init();
char url[400] = "https://"; // we fetch up to 20 vectors and have 12 chars per vector id + &ids= is 17chars/vec
char url[2048] = "https://"; // we fetch up to 100 vectors and have 12 chars per vector id + &ids= is 17chars/vec
strcat(url, index_host); strcat(url, "/vectors/fetch?"); // https://t1-23kshha.svc.apw5-4e34-81fa.pinecone.io/vectors/upsert
cJSON_ArrayForEach(ids, ids) {
strcat(url, "ids=");
Expand Down
61 changes: 39 additions & 22 deletions src/pinecone/pinecone_build.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,57 +143,74 @@ void InitIndexPages(Relation index, VectorMetric metric, int dimensions, char *p
PineconeStaticMetaPage pinecone_static_meta_page;
PineconeBufferMetaPage pinecone_buffer_meta_page;
PineconeBufferOpaque buffer_head_opaque;
PineconeCheckpoint default_checkpoint;
GenericXLogState *state = GenericXLogStart(index);

// init default checkpoint
default_checkpoint.blkno = PINECONE_BUFFER_HEAD_BLKNO;
default_checkpoint.checkpoint_no = 0;
default_checkpoint.is_checkpoint = true;
default_checkpoint.n_preceding_tuples = 0;

// Lock the relation for extension, not really necessary since this is called exactly once in build_index
LockRelationForExtension(index, ExclusiveLock);

// CREATE THE STATIC META PAGE
meta_buf = ReadBufferExtended(index, forkNum, P_NEW, RBM_NORMAL, NULL);
Assert(BufferGetBlockNumber(meta_buf) == PINECONE_STATIC_METAPAGE_BLKNO);
meta_buf = ReadBufferExtended(index, forkNum, P_NEW, RBM_NORMAL, NULL); LockBuffer(meta_buf, BUFFER_LOCK_EXCLUSIVE);
if (BufferGetBlockNumber(meta_buf) != PINECONE_STATIC_METAPAGE_BLKNO) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Pinecone static meta page block number mismatch")));
}
meta_page = GenericXLogRegisterBuffer(state, meta_buf, GENERIC_XLOG_FULL_IMAGE);
PageInit(meta_page, BufferGetPageSize(meta_buf), sizeof(PineconeStaticMetaPageData)); // format as a page
PageInit(meta_page, BufferGetPageSize(meta_buf), 0); // format as a page
pinecone_static_meta_page = PineconePageGetStaticMeta(meta_page);
pinecone_static_meta_page->metric = metric;
pinecone_static_meta_page->dimensions = dimensions;
// You must set pd_lower because GenericXLog ignores any changes in the free space between pd_lower and pd_upper
((PageHeader) meta_page)->pd_lower = ((char *) pinecone_static_meta_page - (char *) meta_page) + sizeof(PineconeStaticMetaPageData);

// copy host and pinecone_index_name, checking for length
if (strlcpy(pinecone_static_meta_page->host, host, sizeof((PineconeStaticMetaPage) 0)) >= sizeof((PineconeStaticMetaPage) 0)) {
ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG), errmsg("Host name too long"), errhint("The host name is %s... and is %d characters long. The maximum length is %d characters.", host, (int) strlen(host), (int) sizeof(pinecone_static_meta_page->host))));
if (strlcpy(pinecone_static_meta_page->host, host, PINECONE_HOST_MAX_LENGTH) > PINECONE_HOST_MAX_LENGTH) {
ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG), errmsg("Host name too long"),
errhint("The host name is %s... and is %d characters long. The maximum length is %d characters.",
host, (int) strlen(host), PINECONE_HOST_MAX_LENGTH)));
}
if (strlcpy(pinecone_static_meta_page->pinecone_index_name, pinecone_index_name, sizeof((PineconeStaticMetaPage) 0)) >= sizeof((PineconeStaticMetaPage) 0)) {
ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG), errmsg("Pinecone index name too long"), errhint("The pinecone index name is %s... and is %d characters long. The maximum length is %d characters.", pinecone_index_name, (int) strlen(pinecone_index_name), (int) sizeof(pinecone_static_meta_page->pinecone_index_name))));
if (strlcpy(pinecone_static_meta_page->pinecone_index_name, pinecone_index_name, PINECONE_NAME_MAX_LENGTH) > PINECONE_NAME_MAX_LENGTH) {
ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG), errmsg("Pinecone index name too long"),
errhint("The pinecone index name is %s... and is %d characters long. The maximum length is %d characters.",
pinecone_index_name, (int) strlen(pinecone_index_name), PINECONE_NAME_MAX_LENGTH)));
}

// CREATE THE BUFFER HEAD
buffer_head_buf = ReadBufferExtended(index, forkNum, P_NEW, RBM_NORMAL, NULL);
Assert(BufferGetBlockNumber(buffer_head_buf) == PINECONE_BUFFER_HEAD_BLKNO);
buffer_head_page = GenericXLogRegisterBuffer(state, buffer_head_buf, GENERIC_XLOG_FULL_IMAGE);
PineconePageInit(buffer_head_page, BufferGetPageSize(buffer_head_buf));
buffer_head_opaque = PineconePageGetOpaque(buffer_head_page);
buffer_head_opaque->checkpoint.blkno = PINECONE_BUFFER_HEAD_BLKNO;
buffer_head_opaque->checkpoint.checkpoint_no = 0;
buffer_head_opaque->checkpoint.n_preceding_tuples = 0;
buffer_head_opaque->checkpoint.is_checkpoint = true;

// CREATE THE BUFFER META PAGE
buffer_meta_buf = ReadBufferExtended(index, forkNum, P_NEW, RBM_NORMAL, NULL);
buffer_meta_buf = ReadBufferExtended(index, forkNum, P_NEW, RBM_NORMAL, NULL); LockBuffer(buffer_meta_buf, BUFFER_LOCK_EXCLUSIVE);
Assert(BufferGetBlockNumber(buffer_meta_buf) == PINECONE_BUFFER_METAPAGE_BLKNO);
buffer_meta_page = GenericXLogRegisterBuffer(state, buffer_meta_buf, GENERIC_XLOG_FULL_IMAGE);
PageInit(buffer_meta_page, BufferGetPageSize(buffer_meta_buf), sizeof(PineconeBufferMetaPageData)); // format as a page
pinecone_buffer_meta_page = PineconePageGetBufferMeta(buffer_meta_page);
// set head, pinecone_tail, and live_tail to START
pinecone_buffer_meta_page->ready_checkpoint = buffer_head_opaque->checkpoint;
pinecone_buffer_meta_page->flush_checkpoint = buffer_head_opaque->checkpoint;
pinecone_buffer_meta_page->latest_checkpoint = buffer_head_opaque->checkpoint;
pinecone_buffer_meta_page->ready_checkpoint = default_checkpoint;
pinecone_buffer_meta_page->flush_checkpoint = default_checkpoint;
pinecone_buffer_meta_page->latest_checkpoint = default_checkpoint;
pinecone_buffer_meta_page->insert_page = PINECONE_BUFFER_HEAD_BLKNO;
pinecone_buffer_meta_page->n_tuples_since_last_checkpoint = 0;
// adjust pd_lower
((PageHeader) buffer_meta_page)->pd_lower = ((char *) pinecone_buffer_meta_page - (char *) buffer_meta_page) + sizeof(PineconeBufferMetaPageData);

// CREATE THE BUFFER HEAD
buffer_head_buf = ReadBufferExtended(index, forkNum, P_NEW, RBM_NORMAL, NULL); LockBuffer(buffer_head_buf, BUFFER_LOCK_EXCLUSIVE);
Assert(BufferGetBlockNumber(buffer_head_buf) == PINECONE_BUFFER_HEAD_BLKNO);
buffer_head_page = GenericXLogRegisterBuffer(state, buffer_head_buf, GENERIC_XLOG_FULL_IMAGE);
PineconePageInit(buffer_head_page, BufferGetPageSize(buffer_head_buf));
buffer_head_opaque = PineconePageGetOpaque(buffer_head_page);
buffer_head_opaque->checkpoint = default_checkpoint;

// cleanup
GenericXLogFinish(state);
UnlockReleaseBuffer(meta_buf);
UnlockReleaseBuffer(buffer_meta_buf);
UnlockReleaseBuffer(buffer_head_buf);
UnlockRelationForExtension(index, ExclusiveLock);


}


Expand Down
41 changes: 41 additions & 0 deletions src/pinecone/pinecone_helpers.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,45 @@ pinecone_delete_unused_indexes(PG_FUNCTION_ARGS) {
if (unused) pinecone_delete_index(pinecone_api_key, pinecone_index_name);
}
PG_RETURN_INT32(deleted);
}

// I need a way to go from an index name to an index oid: I can do this by querying the pg_class table
Oid get_index_oid_from_name(char* index_name) {
Oid index_oid;
char query[256];
int ret;
sprintf(query, "SELECT oid FROM pg_class WHERE relname = '%s' AND relkind = 'i';", index_name);
SPI_connect();
ret = SPI_execute(query, false, 0);
if (ret == SPI_OK_SELECT && SPI_processed > 0) {
TupleDesc tupdesc = SPI_tuptable->tupdesc;
SPITupleTable *tuptable = SPI_tuptable;
HeapTuple tuple = tuptable->vals[0];
bool isnull;
Datum datum = heap_getattr(tuple, 1, tupdesc, &isnull);
if (!isnull) {
index_oid = DatumGetObjectId(datum);
}
} else {
elog(ERROR, "Failed to execute query");
}
SPI_finish();
return index_oid;
}

PGDLLEXPORT PG_FUNCTION_INFO_V1(pinecone_print_index);
Datum
pinecone_print_index(PG_FUNCTION_ARGS) {
char* index_name;
Oid index_oid;
Relation index;
index_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
elog(NOTICE, "Index name: %s", index_name);
index_oid = get_index_oid_from_name(index_name);
elog(NOTICE, "Index oid: %u", index_oid);
index = index_open(index_oid, AccessShareLock);
elog(NOTICE, "Index: %d", index->rd_index->indrelid);
pinecone_print_relation(index);
index_close(index, AccessShareLock);
PG_RETURN_VOID();
}
8 changes: 8 additions & 0 deletions src/pinecone/pinecone_insert.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ void PineconePageInit(Page page, Size pageSize)
PageInit(page, pageSize, sizeof(PineconeBufferOpaqueData));
opaque = PineconePageGetOpaque(page);
opaque->nextblkno = InvalidBlockNumber;
opaque->prev_checkpoint_blkno = InvalidBlockNumber;
opaque->checkpoint.is_checkpoint = false;
// checkpoint
// ItemPointerSetInvalid
Expand Down Expand Up @@ -86,6 +87,9 @@ bool AppendBufferTuple(Relation index, Datum *values, bool *isnull, ItemPointer
// add item to insert page
if (!full && !create_checkpoint) {
PageAddItem(insert_page, (Item) itup, itemsz, InvalidOffsetNumber, false, false);
// log the number of items on this page MaxOffsetNumber
elog(DEBUG1, "No new page! Page has %d items", PageGetMaxOffsetNumber(insert_page));

// release insert_page
GenericXLogFinish(state);
UnlockReleaseBuffer(insert_buf);
Expand Down Expand Up @@ -168,9 +172,13 @@ bool pinecone_insert(Relation index, Datum *values, bool *isnull, ItemPointer he

// if there are enough tuples in the buffer, advance the pinecone tail
if (checkpoint_created) {
elog(DEBUG1, "Checkpoint created. Flushing to Pinecone");
FlushToPinecone(index);
pinecone_print_relation(index);
}

// log the state of the relation for debugging

return false;
}

Expand Down
Loading

0 comments on commit bb4a2d7

Please sign in to comment.