From 084a95bb7d9374ae398bdcd356fe77b74f7677ed Mon Sep 17 00:00:00 2001 From: DosticJelena Date: Wed, 20 Sep 2023 09:07:35 +0200 Subject: [PATCH] add async_add implementation and async upsert to Pinecone vector store --- llama_index/indices/vector_store/base.py | 2 +- llama_index/vector_stores/pinecone.py | 36 ++++++++++++++++++------ tests/indices/vector_store/utils.py | 24 ++++++++-------- 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/llama_index/indices/vector_store/base.py b/llama_index/indices/vector_store/base.py index f64c1c5cad751..57c3e355c1c90 100644 --- a/llama_index/indices/vector_store/base.py +++ b/llama_index/indices/vector_store/base.py @@ -145,7 +145,7 @@ async def _async_add_nodes_to_index( return nodes = await self._aget_node_with_embedding(nodes, show_progress) - new_ids = self._vector_store.add(nodes) + new_ids = await self._vector_store.async_add(nodes) # if the vector store doesn't store text, we need to add the nodes to the # index struct and document store diff --git a/llama_index/vector_stores/pinecone.py b/llama_index/vector_stores/pinecone.py index c624b0c6b80a6..25266ae5bd7ae 100644 --- a/llama_index/vector_stores/pinecone.py +++ b/llama_index/vector_stores/pinecone.py @@ -8,7 +8,8 @@ from collections import Counter from functools import partial from typing import Any, Callable, Dict, List, Optional, cast - +import asyncio +from llama_index.utils import iter_batch from llama_index.bridge.pydantic import PrivateAttr from llama_index.schema import BaseNode, MetadataMode, TextNode from llama_index.vector_stores.types import ( @@ -30,7 +31,7 @@ SPARSE_VECTOR_KEY = "sparse_values" METADATA_KEY = "metadata" -DEFAULT_BATCH_SIZE = 100 +DEFAULT_BATCH_SIZE = 200 _logger = logging.getLogger(__name__) @@ -170,7 +171,7 @@ def __init__( if tokenizer is None and add_sparse_vector: tokenizer = get_default_tokenizer() - self._tokenizer = tokenizer + self._tokenizer = tokenizer # type: ignore super().__init__( index_name=index_name, @@ -256,14 +257,31 @@ def add( ids.append(node_id) entries.append(entry) - self._pinecone_index.upsert( - entries, - namespace=self.namespace, - batch_size=self.batch_size, - **self.insert_kwargs, - ) + + [ + self._pinecone_index.upsert( + vectors=batch, + async_req=True, + ) + for batch in iter_batch(entries, self.batch_size) + ] + return ids + async def async_add( + self, + nodes: List[BaseNode], + ) -> List[str]: + """Asynchronously add a list of embedding results to the collection. + + Args: + nodes (List[BaseNode]): Embedding results to add. + + Returns: + List[str]: List of IDs of the added documents. + """ + return await asyncio.to_thread(self.add, nodes) # type: ignore + def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: """ Delete nodes using with ref_doc_id. diff --git a/tests/indices/vector_store/utils.py b/tests/indices/vector_store/utils.py index a8c5044d02e8d..d15de7779db41 100644 --- a/tests/indices/vector_store/utils.py +++ b/tests/indices/vector_store/utils.py @@ -12,19 +12,19 @@ class MockPineconeIndex: def __init__(self) -> None: """Mock pinecone index.""" - self._tuples: List[Dict[str, Any]] = [] + self._vectors: List[Dict[str, Any]] = [] - def upsert(self, tuples: List[Dict[str, Any]], **kwargs: Any) -> None: + def upsert(self, vectors: List[Dict[str, Any]], **kwargs: Any) -> None: """Mock upsert.""" - self._tuples.extend(tuples) + self._vectors.extend(vectors) def delete(self, ids: List[str]) -> None: """Mock delete.""" - new_tuples = [] - for tup in self._tuples: - if tup["id"] not in ids: - new_tuples.append(tup) - self._tuples = new_tuples + new_vectors = [] + for vec in self._vectors: + if vec["id"] not in ids: + new_vectors.append(vec) + self._vectors = new_vectors def query( self, @@ -38,7 +38,7 @@ def query( ) -> Any: """Mock query.""" # index_mat is n x k - index_mat = np.array([tup["values"] for tup in self._tuples]) + index_mat = np.array([tup["values"] for tup in self._vectors]) query_vec = np.array(vector)[np.newaxis, :] # compute distances @@ -49,10 +49,10 @@ def query( matches = [] for index in indices: - tup = self._tuples[index] + vec = self._vectors[index] match = MagicMock() - match.metadata = tup["metadata"] - match.id = tup["id"] + match.metadata = vec["metadata"] + match.id = vec["id"] matches.append(match) response = MagicMock()