Skip to content

Commit

Permalink
close but no cigar
Browse files Browse the repository at this point in the history
  • Loading branch information
thesteve0 committed Mar 3, 2024
1 parent b521be3 commit 263cd69
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,4 @@ cython_debug/
#.idea/

/arxiv_abstracts.zip
*.parquet
62 changes: 26 additions & 36 deletions arxiv-import.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
### DO NOT USE = this is my failed attempt to try to get the data into pgvector

import re
import sys

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import psycopg
from pgvector.psycopg import register_vector
from pathlib import Path

# The proper word is arxiv but I messed when creating the folder and project

PARQUET_PATH= Path('../arvix_abstracts/arxiv_abstracts')
PARQUET_PATH= Path('./')
DB_NAME= 'arxiv'

conn = psycopg.connect("host=127.0.0.1 user=postgres password='test'", autocommit=True)
conn = psycopg.connect("host=localhost user=postgres password='letmein'", autocommit=True)
cursor = conn.cursor()

cursor.execute("SELECT datname FROM pg_database;")
Expand All @@ -31,57 +33,45 @@

#Now close the connection and switch DB
conn.close()
conn = psycopg.connect("host=127.0.0.1 user=postgres password='test' dbname='arxiv' ")

conn = psycopg.connect("host=127.0.0.1 user=postgres password='letmein' dbname='arxiv' ")
cursor = conn.cursor()


cursor.execute('CREATE EXTENSION IF NOT EXISTS vector')
register_vector(conn)

conn.execute('DROP TABLE IF EXISTS documents')
cursor.execute('CREATE TABLE documents (id bigserial PRIMARY KEY, abstract text, embedding vector(768))')
cursor.connection.commit()

#sys.exit()
## Loop through the parquet files)
for path in PARQUET_PATH.rglob('*.parquet'):
match = re.search('.*(\w{16})\.parquet', str(path))
data_table = pa.parquet.read_table(path, memory_map=True, columns=['abstract', 'embeddings'])
data_pandas = data_table.to_pandas()
#cursor.executemany("INSERT INTO documents (abstract, embedding) VALUES (%s, %s)", data_table.to_pylist())
print("working on: " + str(path))
i = 0
# for idx, row in data_pandas.iterrows():
# abstract = row['abstract']
# embedding = row['embeddings']
# cursor.executemany("INSERT INTO documents (abstract, embedding) VALUES (%s, %s)", (abstract, embedding.tolist()))
# if (i % 1000 == 0):
# print("working on row: " + str(i))
# i = i + 1
# cursor.connection.commit()


with cursor.copy("COPY documents (abstract, embedding) FROM STDIN") as copy:

with cursor.copy("COPY documents (abstract, embedding) FROM STDIN with (FORMAT BINARY)") as copy:
print("working on: " + str(path))
i = 0
for item in data_table.to_pylist():
print("working on row: " + str(i))
abstract = item['abstract']
embedding = item['embeddings']
item_tuple = (abstract, embedding)
copy.write_row(item_tuple)
#for i in range(data_pandas.rows):
for i in range(11):
copy.write_row([data_pandas.abstract[i], data_pandas.embeddings[i]])

#copy.set_types(['text', 'vector'])
# i = 0
# for item in data_table.to_pylist():
# print("working on row: " + str(i))
# abstract = item['abstract']
# embedding = np.asarray(item['embeddings'])
# # item_tuple = (abstract, embedding)
# copy.write_row([abstract, embedding])
i = i + 1
cursor.connection.commit()
#cursor.mogrify("INSERT INTO documents (abstract, embedding) VALUES (%s, %s)", (abstract, embedding.tolist()))
#cursor.execute("INSERT INTO documents (abstract, embedding) VALUES (%s, %s)", (abstract, embedding.tolist()))
#print(" ")
# for row in data:
# abstract = data['abstract']
# embedding = data['embeddings']



#print(item)
# data['embeddings'][1]
# conn.execute('INSERT INTO documents (content, embedding) VALUES (%s, %s)', (content, embedding))
# print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
#print(path)
if i == 11: break
conn.close()
print('finished')
# Should it create the DB and then load the vector extension - yes for now
Expand Down
69 changes: 69 additions & 0 deletions pgvector_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import numpy as np
from pgvector.psycopg import register_vector
import psycopg

# generate random data
rows = 100000
dimensions = 128
embeddings = np.random.rand(rows, dimensions)
categories = np.random.randint(100, size=rows).tolist()
queries = np.random.rand(10, dimensions)

DB_NAME= 'pgvector_citus'

#Make the table
conn = psycopg.connect("host=localhost user=postgres password='letmein'", autocommit=True)
cursor = conn.cursor()

cursor.execute("SELECT datname FROM pg_database;")

list_database = cursor.fetchall()

if ('pgvector_citus',) in list_database:
cursor.execute(("DROP database "+ DB_NAME +" with (FORCE);"))
cursor.execute("create database " + DB_NAME + ";");
else:
cursor.execute("create database " + DB_NAME + ";");

#Now close the connection and switch DB
conn.close()


# enable extensions
conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='pgvector_citus'", autocommit=True)
conn.execute('CREATE EXTENSION IF NOT EXISTS vector')

# GUC variables set on the session do not propagate to Citus workers
# https://github.com/citusdata/citus/issues/462
# you can either:
# 1. set them on the system, user, or database and reconnect
# 2. set them for a transaction with SET LOCAL
# S
conn.close()

# reconnect for updated GUC variables to take effect
conn = psycopg.connect("host=localhost user=postgres password='letmein' dbname='pgvector_citus'", autocommit=True)
register_vector(conn)

print('Creating distributed table')
conn.execute('DROP TABLE IF EXISTS items')
conn.execute('CREATE TABLE items (id bigserial, embedding vector(%d), category_id bigint, PRIMARY KEY (id, category_id))' % dimensions)
#conn.execute('SET citus.shard_count = 4')
#conn.execute("SELECT create_distributed_table('items', 'category_id')")

print('Loading data in parallel')
with conn.cursor().copy('COPY items (embedding, category_id) FROM STDIN WITH (FORMAT BINARY)') as copy:
copy.set_types(['vector', 'bigint'])

for i in range(rows):
copy.write_row([embeddings[i], categories[i]])

conn.close()

# print('Creating index in parallel')
# conn.execute('CREATE INDEX ON items USING hnsw (embedding vector_l2_ops)')
#
# print('Running distributed queries')
# for query in queries:
# items = conn.execute('SELECT id FROM items ORDER BY embedding <-> %s LIMIT 10', (query,)).fetchall()
# print([r[0] for r in items])
10 changes: 3 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
numpy
requests>=2.26.0
scikit_learn>=1.0.2
scipy
sentence_transformers>=2.2.0
torch
tqdm
psycopg
pyarrow
pgvector

0 comments on commit 263cd69

Please sign in to comment.