Skip to content

Commit

Permalink
Merge pull request #347 from vespa-engine/tgm/collect-vespa-feature-b…
Browse files Browse the repository at this point in the history
…atch

Use query batch to speed up vespa feature collection MERGEOK
  • Loading branch information
thigm85 authored Jun 8, 2022
2 parents 4d007aa + 18ae240 commit 273b54b
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 2 deletions.
188 changes: 188 additions & 0 deletions vespa/application.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

import os
import sys
import ssl
import aiohttp
Expand Down Expand Up @@ -340,6 +341,7 @@ async def _query_batch_async(
**kwargs,
)

@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
def query_batch(
self,
body_batch: Optional[List[Dict]] = None,
Expand Down Expand Up @@ -1007,6 +1009,192 @@ def collect_training_data(
training_data = DataFrame.from_records(training_data)
return training_data

def collect_vespa_features(
self,
labeled_data: Union[List[Dict], DataFrame],
id_field: str,
query_model: QueryModel,
number_additional_docs: int,
fields: List[str],
keep_features: Optional[List[str]] = None,
relevant_score: int = 1,
default_score: int = 0,
**kwargs,
) -> DataFrame:
"""
Collect Vespa features based on a set of labelled data.
labeled_data can be a DataFrame or a List of Dict:
>>> labeled_data_df = DataFrame(
... data={
... "qid": [0, 0, 1, 1],
... "query": ["Intrauterine virus infections and congenital heart disease", "Intrauterine virus infections and congenital heart disease", "Clinical and immunologic studies in identical twins discordant for systemic lupus erythematosus", "Clinical and immunologic studies in identical twins discordant for systemic lupus erythematosus"],
... "doc_id": [0, 3, 1, 5],
... "relevance": [1,1,1,1]
... }
... )
>>> labeled_data = [
... {
... "query_id": 0,
... "query": "Intrauterine virus infections and congenital heart disease",
... "relevant_docs": [{"id": 0, "score": 1}, {"id": 3, "score": 1}]
... },
... {
... "query_id": 1,
... "query": "Clinical and immunologic studies in identical twins discordant for systemic lupus erythematosus",
... "relevant_docs": [{"id": 1, "score": 1}, {"id": 5, "score": 1}]
... }
... ]
:param labeled_data: Labelled data containing query, query_id and relevant ids. See details about data format.
:param id_field: The Vespa field representing the document id.
:param query_model: Query model.
:param number_additional_docs: Number of additional documents to retrieve for each relevant document.
:param fields: List of Vespa fields to collect, e.g. ["rankfeatures", "summaryfeatures"]
:param keep_features: List containing the names of the features that should be returned. Default to None,
which return all the features contained in the 'fields' argument.
:param relevant_score: Score to assign to relevant documents. Default to 1.
:param default_score: Score to assign to the additional documents that are not relevant. Default to 0.
:param kwargs: Extra keyword arguments to be included in the Vespa Query.
:return: DataFrame containing document id (document_id), query id (query_id), scores (relevant)
and vespa rank features returned by the Query model RankProfile used.
"""

if isinstance(labeled_data, DataFrame):
labeled_data = parse_labeled_data(df=labeled_data)

flat_data = [
(
data["query_id"],
data["query"],
relevant_doc["id"],
relevant_doc.get("score", relevant_score),
)
for data in labeled_data
for relevant_doc in data["relevant_docs"]
]

queries = [x[1] for x in flat_data]
relevant_search = self.query_batch(
query_batch=queries,
query_model=query_model,
recall=[(id_field, [x[2]]) for x in flat_data],
**kwargs,
)
result = []
for ((query_id, query, relevant_id, relevant_score), query_result) in zip(
flat_data, relevant_search
):
result.extend(
self.annotate_data(
hits=query_result.hits,
query_id=query_id,
id_field=id_field,
relevant_id=relevant_id,
fields=fields,
relevant_score=relevant_score,
default_score=default_score,
)
)
if number_additional_docs > 0:
additional_hits_result = self.query_batch(
query_batch=queries,
query_model=query_model,
hits=number_additional_docs,
**kwargs,
)
for ((query_id, query, relevant_id, relevant_score), query_result) in zip(
flat_data, additional_hits_result
):
result.extend(
self.annotate_data(
hits=query_result.hits,
query_id=query_id,
id_field=id_field,
relevant_id=relevant_id,
fields=fields,
relevant_score=relevant_score,
default_score=default_score,
)
)
df = DataFrame.from_records(result)
df = df.drop_duplicates(["document_id", "query_id", "label"])
df = df.sort_values("query_id")
if keep_features:
df = df[["document_id", "query_id", "label"] + keep_features]
return df

def store_vespa_features(
self,
output_file_path: str,
labeled_data: Union[List[Dict], DataFrame],
id_field: str,
query_model: QueryModel,
number_additional_docs: int,
fields: List[str],
keep_features: Optional[List[str]] = None,
relevant_score: int = 1,
default_score: int = 0,
batch_size=1000,
**kwargs,
):
"""
Retrieve Vespa rank features and store them in a .csv file.
:param output_file_path: Path of the .csv output file. It will create the file of it does not exist and
append the vespa features to an pre-existing file.
:param labeled_data: Labelled data containing query, query_id and relevant ids. See details about data format.
:param id_field: The Vespa field representing the document id.
:param query_model: Query model.
:param number_additional_docs: Number of additional documents to retrieve for each relevant document.
:param fields: List of Vespa fields to collect, e.g. ["rankfeatures", "summaryfeatures"]
:param keep_features: List containing the names of the features that should be returned. Default to None,
which return all the features contained in the 'fields' argument.
:param relevant_score: Score to assign to relevant documents. Default to 1.
:param default_score: Score to assign to the additional documents that are not relevant. Default to 0.
:param batch_size: The size of the batch of labeled data points to be processed.
:param kwargs: Extra keyword arguments to be included in the Vespa Query.
:return: returns 0 upon success.
"""

if isinstance(labeled_data, DataFrame):
labeled_data = parse_labeled_data(df=labeled_data)

mini_batches = [
labeled_data[i : i + batch_size]
for i in range(0, len(labeled_data), batch_size)
]
for idx, mini_batch in enumerate(mini_batches):
vespa_features = self.collect_vespa_features(
labeled_data=mini_batch,
id_field=id_field,
query_model=query_model,
number_additional_docs=number_additional_docs,
fields=fields,
keep_features=keep_features,
relevant_score=relevant_score,
default_score=default_score,
**kwargs,
)
if os.path.isfile(output_file_path):
vespa_features.to_csv(
path_or_buf=output_file_path, header=False, index=False, mode="a"
)
else:
vespa_features.to_csv(
path_or_buf=output_file_path, header=True, index=False, mode="w"
)
print(
"Rows collected: {}.\nBatch progress: {}/{}.".format(
vespa_features.shape[0],
idx + 1,
len(mini_batches),
)
)
return 0

def evaluate_query(
self,
eval_metrics: List[EvalMetric],
Expand Down
51 changes: 50 additions & 1 deletion vespa/test_integration_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
import asyncio
import json
from pandas import DataFrame
from pandas import DataFrame, read_csv
from vespa.package import (
HNSW,
Document,
Expand Down Expand Up @@ -1203,6 +1203,51 @@ def test_check_duplicated_features(self):
document_ids = [x["document_id"] for x in data]
self.assertEqual(len(document_ids), len(set(document_ids)))

def test_store_vespa_features(self):
schema = "cord19"
docs = [
{"id": fields["id"], "fields": fields} for fields in self.fields_to_send
]
self.app.feed_batch(
schema=schema,
batch=docs,
asynchronous=True,
connections=120,
total_timeout=50,
)
labeled_data = [
{
"query_id": 0,
"query": "give me title 1",
"relevant_docs": [{"id": "1", "score": 1}],
},
{
"query_id": 1,
"query": "give me title 3",
"relevant_docs": [{"id": "3", "score": 1}],
},
]

self.app.store_vespa_features(
output_file_path=os.path.join(
os.environ["RESOURCES_DIR"], "vespa_features.csv"
),
labeled_data=labeled_data,
id_field="id",
query_model=QueryModel(
match_phase=OR(), rank_profile=Ranking(name="bm25", list_features=True)
),
number_additional_docs=2,
fields=["rankfeatures", "summaryfeatures"],
)
rank_features = read_csv(
os.path.join(os.environ["RESOURCES_DIR"], "vespa_features.csv")
)
# at least two relevant docs
self.assertTrue(rank_features.shape[0] > 2)
# at least one feature besides document_id, query_id and label
self.assertTrue(rank_features.shape[1] > 3)

def test_model_endpoints_when_no_model_is_available(self):
self.get_model_endpoints_when_no_model_is_available(
app=self.app,
Expand Down Expand Up @@ -1274,6 +1319,10 @@ def test_bert_model_input_and_output(self):
def tearDown(self) -> None:
self.vespa_docker.container.stop()
self.vespa_docker.container.remove()
try:
os.remove(os.path.join(os.environ["RESOURCES_DIR"], "vespa_features.csv"))
except OSError:
pass


class TestQaApplication(TestApplicationCommon):
Expand Down
22 changes: 21 additions & 1 deletion vespa/test_integration_running_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def test_workflow(self):
)
self.assertEqual(evaluation.shape, (2, 7))


def test_collect_training_data(self):
app = Vespa(url="https://api.cord19.vespa.ai")
query_model = QueryModel(
Expand All @@ -172,6 +171,27 @@ def test_collect_training_data(self):
"relevant_docs": [{"id": 1, "score": 1}, {"id": 5, "score": 1}],
},
]
rank_features = app.collect_vespa_features(
labeled_data=labeled_data,
id_field="id",
query_model=query_model,
number_additional_docs=2,
fields=["rankfeatures"],
)
self.assertTrue(rank_features.shape[0] > 4)
# It should have at least one rank feature in addition to document_id, query_id and label
self.assertTrue(rank_features.shape[1] > 3)
rank_features = app.collect_vespa_features(
labeled_data=labeled_data,
id_field="id",
query_model=query_model,
number_additional_docs=2,
fields=["rankfeatures"],
keep_features=["textSimilarity(title).score"]
)
self.assertTrue(rank_features.shape[0] > 4)
# It should have at least one rank feature in addition to document_id, query_id and label
self.assertTrue(rank_features.shape[1] == 4)
training_data_batch = app.collect_training_data(
labeled_data=labeled_data,
id_field="id",
Expand Down

0 comments on commit 273b54b

Please sign in to comment.