Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
thesteve0 committed Mar 5, 2024
1 parent 263cd69 commit faeb210
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 43 deletions.
64 changes: 64 additions & 0 deletions arvix-example-copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from pgvector.psycopg import register_vector
import psycopg
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq

PARQUET_PATH = Path('./')
DB_NAME= 'arxiv2'
data_pandas = 'replaced later'
dimensions = 768

# NOTE: The only works for a single file bc we read the file and then copy it
# This won't work when there are many parquet files in the directory
for path in PARQUET_PATH.glob('*.parquet'):
print("working on: " + str(path))
data_table = pa.parquet.read_table(path, memory_map=True)
data_pandas = data_table.to_pandas()



#Make the table
conn = psycopg.connect("host=localhost user=postgres password='letmein'", autocommit=True)
cursor = conn.cursor()

cursor.execute("SELECT datname FROM pg_database;")

list_database = cursor.fetchall()

if ('arxiv2',) in list_database:
cursor.execute(("DROP database "+ DB_NAME +" with (FORCE);"))
cursor.execute("create database " + DB_NAME + ";");
else:
cursor.execute("create database " + DB_NAME + ";");

#Now close the connection and switch DB
conn.close()

# enable extensions
conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='pgvector_citus'", autocommit=True)
conn.execute('CREATE EXTENSION IF NOT EXISTS vector')
conn.close()
# reconnect for updated GUC variables to take effect
conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='pgvector_citus'", autocommit=True)
register_vector(conn)

print('Creating distributed table')
conn.execute('DROP TABLE IF EXISTS items')
conn.execute('CREATE TABLE items (id bigserial, embedding vector(%d), abstract text, PRIMARY KEY (id))' % dimensions)


print('Loading data in parallel')
with conn.cursor().copy('COPY items ( embedding,abstract) FROM STDIN WITH (FORMAT BINARY)') as copy:
copy.set_types(['vector', 'text'])

for i in range(11):
copy.write_row([data_pandas.iloc[i]["embeddings"], data_pandas.iloc[i]["abstract"]])

# print('Creating index in parallel')
# conn.execute('CREATE INDEX ON items USING hnsw (embedding vector_l2_ops)')
#
# print('Running distributed queries')
# for query in queries:
# items = conn.execute('SELECT id FROM items ORDER BY embedding <-> %s LIMIT 10', (query,)).fetchall()
# print([r[0] for r in items])
63 changes: 21 additions & 42 deletions arxiv-import.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
# @TODO
# This script needs to import the arvix parquets into the postgres instance and creates th index

### DO NOT USE = this is my failed attempt to try to get the data into pgvector

import re
import sys

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import psycopg
Expand All @@ -16,7 +8,7 @@
# The proper word is arxiv but I messed when creating the folder and project

PARQUET_PATH= Path('./')
DB_NAME= 'arxiv'
DB_NAME= 'lala'

conn = psycopg.connect("host=localhost user=postgres password='letmein'", autocommit=True)
cursor = conn.cursor()
Expand All @@ -25,54 +17,41 @@

list_database = cursor.fetchall()

if ('arxiv',) in list_database:
if ('lala',) in list_database:
cursor.execute(("DROP database "+ DB_NAME +" with (FORCE);"))
cursor.execute("create database " + DB_NAME + ";");
cursor.execute("create database " + DB_NAME + ";")
else:
cursor.execute("create database " + DB_NAME + ";");
cursor.execute("create database " + DB_NAME + ";")

#Now close the connection and switch DB
conn.close()

conn = psycopg.connect("host=127.0.0.1 user=postgres password='letmein' dbname='arxiv' ")
cursor = conn.cursor()

conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='lala'", autocommit=True)
conn.execute('CREATE EXTENSION IF NOT EXISTS vector')
conn.close()

cursor.execute('CREATE EXTENSION IF NOT EXISTS vector')
conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='lala'", autocommit=True)
register_vector(conn)

conn.execute('DROP TABLE IF EXISTS documents')
cursor.execute('CREATE TABLE documents (id bigserial PRIMARY KEY, abstract text, embedding vector(768))')
cursor.connection.commit()
#sys.exit()
## Loop through the parquet files)
for path in PARQUET_PATH.rglob('*.parquet'):
conn.execute('CREATE TABLE documents (id bigserial PRIMARY KEY, abstract text, embedding vector(768))')

for path in PARQUET_PATH.glob('*.parquet'):
match = re.search('.*(\w{16})\.parquet', str(path))
data_table = pa.parquet.read_table(path, memory_map=True, columns=['abstract', 'embeddings'])
data_pandas = data_table.to_pandas()
#cursor.executemany("INSERT INTO documents (abstract, embedding) VALUES (%s, %s)", data_table.to_pylist())

print("working on: " + str(path))
i = 0



with cursor.copy("COPY documents (abstract, embedding) FROM STDIN with (FORMAT BINARY)") as copy:
print("working on: " + str(path))
#for i in range(data_pandas.rows):
for i in range(11):
copy.write_row([data_pandas.abstract[i], data_pandas.embeddings[i]])

#copy.set_types(['text', 'vector'])
# i = 0
# for item in data_table.to_pylist():
# print("working on row: " + str(i))
# abstract = item['abstract']
# embedding = np.asarray(item['embeddings'])
# # item_tuple = (abstract, embedding)
# copy.write_row([abstract, embedding])
i = i + 1
if i == 11: break
conn.close()

with conn.cursor().copy("COPY documents (embedding, abstract) FROM STDIN with (FORMAT BINARY)") as copy:
print("working on: " + str(path))
copy.set_types(['vector', 'text'])
for i in range (0,len(data_pandas)):
#for i in range(11):
copy.write_row([data_pandas.iloc[i]["embeddings"], data_pandas.iloc[i]["abstract"]])


print('finished')
# Should it create the DB and then load the vector extension - yes for now
# Table structure
Expand Down
2 changes: 1 addition & 1 deletion pgvector_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# generate random data
rows = 100000
dimensions = 128
embeddings = np.random.rand(rows, dimensions)
embeddings = np.random.rand(rows, dimensions).astype(np.float32)
categories = np.random.randint(100, size=rows).tolist()
queries = np.random.rand(10, dimensions)

Expand Down

0 comments on commit faeb210

Please sign in to comment.