Skip to content

Commit

Permalink
add async_add implementation and async upsert to Pinecone vector store
Browse files Browse the repository at this point in the history
  • Loading branch information
DosticJelena committed Oct 2, 2023
1 parent 2690bfb commit 8f5fc90
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 22 deletions.
2 changes: 1 addition & 1 deletion llama_index/indices/vector_store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def _async_add_nodes_to_index(
return

nodes = await self._aget_node_with_embedding(nodes, show_progress)
new_ids = self._vector_store.add(nodes)
new_ids = await self._vector_store.async_add(nodes)

# if the vector store doesn't store text, we need to add the nodes to the
# index struct and document store
Expand Down
36 changes: 27 additions & 9 deletions llama_index/vector_stores/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from collections import Counter
from functools import partial
from typing import Any, Callable, Dict, List, Optional, cast

import asyncio
from llama_index.utils import iter_batch
from llama_index.bridge.pydantic import PrivateAttr
from llama_index.schema import BaseNode, MetadataMode, TextNode
from llama_index.vector_stores.types import (
Expand All @@ -30,7 +31,7 @@
SPARSE_VECTOR_KEY = "sparse_values"
METADATA_KEY = "metadata"

DEFAULT_BATCH_SIZE = 100
DEFAULT_BATCH_SIZE = 200

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -172,7 +173,7 @@ def __init__(

if tokenizer is None:
tokenizer = get_default_tokenizer()
self._tokenizer = tokenizer
self._tokenizer = tokenizer # type: ignore

super().__init__(
index_name=index_name,
Expand Down Expand Up @@ -258,14 +259,31 @@ def add(

ids.append(node_id)
entries.append(entry)
self._pinecone_index.upsert(
entries,
namespace=self.namespace,
batch_size=self.batch_size,
**self.insert_kwargs,
)

[
self._pinecone_index.upsert(
vectors=batch,
async_req=True,
)
for batch in iter_batch(entries, self.batch_size)
]

return ids

async def async_add(
self,
nodes: List[BaseNode],
) -> List[str]:
"""Asynchronously add a list of embedding results to the collection.
Args:
nodes (List[BaseNode]): Embedding results to add.
Returns:
List[str]: List of IDs of the added documents.
"""
return await asyncio.to_thread(self.add, nodes) # type: ignore

def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
"""
Delete nodes using with ref_doc_id.
Expand Down
24 changes: 12 additions & 12 deletions tests/indices/vector_store/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@
class MockPineconeIndex:
def __init__(self) -> None:
"""Mock pinecone index."""
self._tuples: List[Dict[str, Any]] = []
self._vectors: List[Dict[str, Any]] = []

def upsert(self, tuples: List[Dict[str, Any]], **kwargs: Any) -> None:
def upsert(self, vectors: List[Dict[str, Any]], **kwargs: Any) -> None:
"""Mock upsert."""
self._tuples.extend(tuples)
self._vectors.extend(vectors)

def delete(self, ids: List[str]) -> None:
"""Mock delete."""
new_tuples = []
for tup in self._tuples:
if tup["id"] not in ids:
new_tuples.append(tup)
self._tuples = new_tuples
new_vectors = []
for vec in self._vectors:
if vec["id"] not in ids:
new_vectors.append(vec)
self._vectors = new_vectors

def query(
self,
Expand All @@ -38,7 +38,7 @@ def query(
) -> Any:
"""Mock query."""
# index_mat is n x k
index_mat = np.array([tup["values"] for tup in self._tuples])
index_mat = np.array([tup["values"] for tup in self._vectors])
query_vec = np.array(vector)[np.newaxis, :]

# compute distances
Expand All @@ -49,10 +49,10 @@ def query(

matches = []
for index in indices:
tup = self._tuples[index]
vec = self._vectors[index]
match = MagicMock()
match.metadata = tup["metadata"]
match.id = tup["id"]
match.metadata = vec["metadata"]
match.id = vec["id"]
matches.append(match)

response = MagicMock()
Expand Down

0 comments on commit 8f5fc90

Please sign in to comment.