From faeb210e2c3975d5a4d94ff1f60c1cf5caebcd08 Mon Sep 17 00:00:00 2001 From: Steve Pousty Date: Mon, 4 Mar 2024 16:15:35 -0800 Subject: [PATCH] working --- arvix-example-copy.py | 64 +++++++++++++++++++++++++++++++++++++++++++ arxiv-import.py | 63 ++++++++++++++---------------------------- pgvector_example.py | 2 +- 3 files changed, 86 insertions(+), 43 deletions(-) create mode 100644 arvix-example-copy.py diff --git a/arvix-example-copy.py b/arvix-example-copy.py new file mode 100644 index 0000000..f15d76d --- /dev/null +++ b/arvix-example-copy.py @@ -0,0 +1,64 @@ +from pgvector.psycopg import register_vector +import psycopg +from pathlib import Path +import pyarrow as pa +import pyarrow.parquet as pq + +PARQUET_PATH = Path('./') +DB_NAME= 'arxiv2' +data_pandas = 'replaced later' +dimensions = 768 + +# NOTE: The only works for a single file bc we read the file and then copy it +# This won't work when there are many parquet files in the directory +for path in PARQUET_PATH.glob('*.parquet'): + print("working on: " + str(path)) + data_table = pa.parquet.read_table(path, memory_map=True) + data_pandas = data_table.to_pandas() + + + +#Make the table +conn = psycopg.connect("host=localhost user=postgres password='letmein'", autocommit=True) +cursor = conn.cursor() + +cursor.execute("SELECT datname FROM pg_database;") + +list_database = cursor.fetchall() + +if ('arxiv2',) in list_database: + cursor.execute(("DROP database "+ DB_NAME +" with (FORCE);")) + cursor.execute("create database " + DB_NAME + ";"); +else: + cursor.execute("create database " + DB_NAME + ";"); + +#Now close the connection and switch DB +conn.close() + +# enable extensions +conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='pgvector_citus'", autocommit=True) +conn.execute('CREATE EXTENSION IF NOT EXISTS vector') +conn.close() +# reconnect for updated GUC variables to take effect +conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='pgvector_citus'", autocommit=True) +register_vector(conn) + +print('Creating distributed table') +conn.execute('DROP TABLE IF EXISTS items') +conn.execute('CREATE TABLE items (id bigserial, embedding vector(%d), abstract text, PRIMARY KEY (id))' % dimensions) + + +print('Loading data in parallel') +with conn.cursor().copy('COPY items ( embedding,abstract) FROM STDIN WITH (FORMAT BINARY)') as copy: + copy.set_types(['vector', 'text']) + + for i in range(11): + copy.write_row([data_pandas.iloc[i]["embeddings"], data_pandas.iloc[i]["abstract"]]) + +# print('Creating index in parallel') +# conn.execute('CREATE INDEX ON items USING hnsw (embedding vector_l2_ops)') +# +# print('Running distributed queries') +# for query in queries: +# items = conn.execute('SELECT id FROM items ORDER BY embedding <-> %s LIMIT 10', (query,)).fetchall() +# print([r[0] for r in items]) \ No newline at end of file diff --git a/arxiv-import.py b/arxiv-import.py index 1df69c7..81d2390 100644 --- a/arxiv-import.py +++ b/arxiv-import.py @@ -1,12 +1,4 @@ -# @TODO -# This script needs to import the arvix parquets into the postgres instance and creates th index - -### DO NOT USE = this is my failed attempt to try to get the data into pgvector - import re -import sys - -import numpy as np import pyarrow as pa import pyarrow.parquet as pq import psycopg @@ -16,7 +8,7 @@ # The proper word is arxiv but I messed when creating the folder and project PARQUET_PATH= Path('./') -DB_NAME= 'arxiv' +DB_NAME= 'lala' conn = psycopg.connect("host=localhost user=postgres password='letmein'", autocommit=True) cursor = conn.cursor() @@ -25,54 +17,41 @@ list_database = cursor.fetchall() -if ('arxiv',) in list_database: +if ('lala',) in list_database: cursor.execute(("DROP database "+ DB_NAME +" with (FORCE);")) - cursor.execute("create database " + DB_NAME + ";"); + cursor.execute("create database " + DB_NAME + ";") else: - cursor.execute("create database " + DB_NAME + ";"); + cursor.execute("create database " + DB_NAME + ";") #Now close the connection and switch DB conn.close() -conn = psycopg.connect("host=127.0.0.1 user=postgres password='letmein' dbname='arxiv' ") -cursor = conn.cursor() +conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='lala'", autocommit=True) +conn.execute('CREATE EXTENSION IF NOT EXISTS vector') +conn.close() -cursor.execute('CREATE EXTENSION IF NOT EXISTS vector') +conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='lala'", autocommit=True) register_vector(conn) conn.execute('DROP TABLE IF EXISTS documents') -cursor.execute('CREATE TABLE documents (id bigserial PRIMARY KEY, abstract text, embedding vector(768))') -cursor.connection.commit() -#sys.exit() -## Loop through the parquet files) -for path in PARQUET_PATH.rglob('*.parquet'): +conn.execute('CREATE TABLE documents (id bigserial PRIMARY KEY, abstract text, embedding vector(768))') + +for path in PARQUET_PATH.glob('*.parquet'): match = re.search('.*(\w{16})\.parquet', str(path)) data_table = pa.parquet.read_table(path, memory_map=True, columns=['abstract', 'embeddings']) data_pandas = data_table.to_pandas() - #cursor.executemany("INSERT INTO documents (abstract, embedding) VALUES (%s, %s)", data_table.to_pylist()) + print("working on: " + str(path)) - i = 0 - - - - with cursor.copy("COPY documents (abstract, embedding) FROM STDIN with (FORMAT BINARY)") as copy: - print("working on: " + str(path)) - #for i in range(data_pandas.rows): - for i in range(11): - copy.write_row([data_pandas.abstract[i], data_pandas.embeddings[i]]) - - #copy.set_types(['text', 'vector']) - # i = 0 - # for item in data_table.to_pylist(): - # print("working on row: " + str(i)) - # abstract = item['abstract'] - # embedding = np.asarray(item['embeddings']) - # # item_tuple = (abstract, embedding) - # copy.write_row([abstract, embedding]) - i = i + 1 - if i == 11: break -conn.close() + + with conn.cursor().copy("COPY documents (embedding, abstract) FROM STDIN with (FORMAT BINARY)") as copy: + print("working on: " + str(path)) + copy.set_types(['vector', 'text']) + for i in range (0,len(data_pandas)): + #for i in range(11): + copy.write_row([data_pandas.iloc[i]["embeddings"], data_pandas.iloc[i]["abstract"]]) + + print('finished') # Should it create the DB and then load the vector extension - yes for now # Table structure diff --git a/pgvector_example.py b/pgvector_example.py index 66c9692..9328468 100644 --- a/pgvector_example.py +++ b/pgvector_example.py @@ -5,7 +5,7 @@ # generate random data rows = 100000 dimensions = 128 -embeddings = np.random.rand(rows, dimensions) +embeddings = np.random.rand(rows, dimensions).astype(np.float32) categories = np.random.randint(100, size=rows).tolist() queries = np.random.rand(10, dimensions)