diff --git a/vespa/application.py b/vespa/application.py
index 347cb938..81fc5322 100644
--- a/vespa/application.py
+++ b/vespa/application.py
@@ -264,7 +264,7 @@ def get_model_endpoint(self, model_id: Optional[str] = None) -> Optional[Respons
def query(
self,
- body: Optional[Dict] = None, **kwargs
+ body: Optional[Dict] = None, groupname:str=None, **kwargs
) -> VespaQueryResponse:
"""
Send a query request to the Vespa application.
@@ -272,18 +272,19 @@ def query(
Send 'body' containing all the request parameters.
:param body: Dict containing request parameters.
+ :param groupname: The groupname used with streaming search.
param kwargs: Extra Vespa Query API parameters.
:return: The response from the Vespa application.
"""
#Use one connection as this is a single query
with VespaSync(self,pool_maxsize=1, pool_connections=1) as sync_app:
return sync_app.query(
- body=body, **kwargs
+ body=body, groupname=groupname, **kwargs
)
def feed_data_point(
- self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs
+ self, schema: str, data_id: str, fields: Dict, namespace: str = None, groupname:str = None, **kwargs
) -> VespaResponse:
"""
Feed a data point to a Vespa app. Will create a new VespaSync with
@@ -294,6 +295,7 @@ def feed_data_point(
:param data_id: Unique id associated with this data point.
:param fields: Dict containing all the fields required by the `schema`.
:param namespace: The namespace that we are sending data to.
+ :param groupname: The groupname that we are sending data
:return: VespaResponse of the HTTP POST request.
"""
if not namespace:
@@ -302,7 +304,7 @@ def feed_data_point(
# single data point
with VespaSync(app=self, pool_connections=1,pool_maxsize=1) as sync_app:
return sync_app.feed_data_point(
- schema=schema, data_id=data_id, fields=fields, namespace=namespace, **kwargs
+ schema=schema, data_id=data_id, fields=fields, namespace=namespace, groupname=groupname, **kwargs
)
def feed_iterable(self,
@@ -394,15 +396,21 @@ def _submit(doc:dict, sync_session:VespaSync) -> Tuple[str, Union[VespaResponse,
return id, VespaResponse(status_code=499,
json={"id":id, "message":"Missing fields in input dict"},
url="n/a", operation_type=operation_type)
+ groupname = doc.get("groupname", None)
try:
if operation_type == "feed":
- response:VespaResponse = sync_session.feed_data_point(schema=schema, namespace=namespace, data_id=id, fields=fields, **kwargs)
+ response:VespaResponse = sync_session.feed_data_point(
+ schema=schema, namespace=namespace,
+ groupname=groupname, data_id=id, fields=fields, **kwargs)
return (id, response)
elif operation_type == "update":
- response:VespaResponse = sync_session.update_data(schema=schema, namespace=namespace, data_id=id, fields=fields, **kwargs)
+ response:VespaResponse = sync_session.update_data(
+ schema=schema, namespace=namespace,
+ groupname=groupname, data_id=id, fields=fields, **kwargs)
return (id, response)
elif operation_type == "delete":
- response:VespaResponse = sync_session.delete_data(schema=schema, namespace=namespace, data_id=id, **kwargs)
+ response:VespaResponse = sync_session.delete_data(
+ schema=schema, namespace=namespace, data_id=id, groupname=groupname, **kwargs)
return (id, response)
except Exception as e:
return (id, e)
@@ -433,7 +441,7 @@ def _handle_result_callback(future:Future, callback:Callable):
consumer_thread.join()
def delete_data(
- self, schema: str, data_id: str, namespace: str = None, **kwargs
+ self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:
"""
Delete a data point from a Vespa app.
@@ -441,13 +449,14 @@ def delete_data(
:param schema: The schema that we are deleting data from.
:param data_id: Unique id associated with this data point.
:param namespace: The namespace that we are deleting data from. If no namespace is provided the schema is used.
+ :param groupname: The groupname that we are deleting data from.
:param kwargs: Additional arguments to be passed to the HTTP DELETE request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP DELETE request.
"""
with VespaSync(self, pool_connections=1, pool_maxsize=1) as sync_app:
return sync_app.delete_data(
- schema=schema, data_id=data_id, namespace=namespace, **kwargs
+ schema=schema, data_id=data_id, namespace=namespace, groupname=groupname, **kwargs
)
@@ -475,7 +484,7 @@ def delete_all_docs(
)
def get_data(
- self, schema:str, data_id: str, namespace: str = None, raise_on_not_found:Optional[bool]=False, **kwargs
+ self, schema:str, data_id: str, namespace: str = None, groupname:str=None, raise_on_not_found:Optional[bool]=False, **kwargs
) -> VespaResponse:
"""
Get a data point from a Vespa app.
@@ -484,6 +493,7 @@ def get_data(
:param schema: The schema that we are getting data from. Will attempt to infer schema name if not provided.
:param data_id: Unique id associated with this data point.
:param namespace: The namespace that we are getting data from. If no namespace is provided the schema is used.
+ :param groupname: The groupname that we are getting data from.
:param raise_on_not_found: Raise an exception if the data_id is not found. Default is False.
:param kwargs: Additional arguments to be passed to the HTTP GET request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP GET request.
@@ -491,7 +501,8 @@ def get_data(
with VespaSync(self,pool_connections=1,pool_maxsize=1) as sync_app:
return sync_app.get_data(
- schema=schema, data_id=data_id, namespace=namespace, raise_on_not_found=raise_on_not_found, **kwargs
+ schema=schema, data_id=data_id, namespace=namespace, groupname=groupname,
+ raise_on_not_found=raise_on_not_found, **kwargs
)
def update_data(
@@ -501,6 +512,7 @@ def update_data(
fields: Dict,
create: bool = False,
namespace: str = None,
+ groupname:str=None,
**kwargs
) -> VespaResponse:
"""
@@ -511,6 +523,7 @@ def update_data(
:param fields: Dict containing all the fields you want to update.
:param create: If true, updates to non-existent documents will create an empty document to update
:param namespace: The namespace that we are updating data. If no namespace is provided the schema is used.
+ :param groupname: The groupname that we are updating data.
:param kwargs: Additional arguments to be passed to the HTTP PUT request. https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP PUT request.
"""
@@ -523,6 +536,7 @@ def update_data(
fields=fields,
create=create,
namespace=namespace,
+ groupname=groupname,
**kwargs
)
@@ -668,7 +682,7 @@ def predict(self, model_id, function_name, encoded_tokens):
return response
def feed_data_point(
- self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs
+ self, schema: str, data_id: str, fields: Dict, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:
"""
Feed a data point to a Vespa app.
@@ -677,12 +691,13 @@ def feed_data_point(
:param data_id: Unique id associated with this data point.
:param fields: Dict containing all the fields required by the `schema`.
:param namespace: The namespace that we are sending data to. If no namespace is provided the schema is used.
+ :param groupname: The group that we are sending data to.
:param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters)
:return: Response of the HTTP POST request.
:raises HTTPError: if one occurred
"""
- path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
+ path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
@@ -699,6 +714,7 @@ def feed_data_point(
def query(
self,
body: Optional[Dict] = None,
+ groupname:str=None,
**kwargs
) -> VespaQueryResponse:
"""
@@ -707,10 +723,14 @@ def query(
Send 'body' containing all the request parameters.
:param body: Dict containing all the request parameters.
+ :param groupname: The groupname used in streaming search
:param kwargs: Additional Valid Vespa HTTP Query Api parameters (https://docs.vespa.ai/en/reference/query-api-reference.html)
:return: Either the request body if debug_request is True or the result from the Vespa application
:raises HTTPError: if one occurred
"""
+
+ if groupname:
+ kwargs["streaming.groupname"] = groupname
response = self.http_session.post(self.app.search_end_point, json=body, params=kwargs)
raise_for_status(response)
return VespaQueryResponse(
@@ -718,7 +738,7 @@ def query(
)
def delete_data(
- self, schema: str, data_id: str, namespace: str = None,
+ self, schema: str, data_id: str, namespace: str = None, groupname:str=None,
**kwargs
) -> VespaResponse:
"""
@@ -732,11 +752,10 @@ def delete_data(
:raises HTTPError: if one occurred
"""
- path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
+ path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
-
response = self.http_session.delete(end_point, params=kwargs)
raise_for_status(response)
return VespaResponse(
@@ -793,7 +812,8 @@ def delete_slice(slice_id):
def get_data(
- self, schema: str, data_id: str, namespace: str = None, raise_on_not_found: Optional[bool]=False, **kwargs
+ self, schema: str, data_id: str, namespace: str = None, groupname:str = None,
+ raise_on_not_found: Optional[bool]=False, **kwargs
) -> VespaResponse:
"""
Get a data point from a Vespa app.
@@ -801,12 +821,13 @@ def get_data(
:param schema: The schema that we are getting data from.
:param data_id: Unique id associated with this data point.
:param namespace: The namespace that we are getting data from.
+ :param groupname: The groupname used to get data
:param raise_on_not_found: Raise an exception if the document is not found.
:param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters)
:return: Response of the HTTP GET request.
:raises HTTPError: if one occurred
"""
- path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
+ path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
@@ -827,6 +848,7 @@ def update_data(
fields: Dict,
create: bool = False,
namespace: str = None,
+ groupname:str = None,
**kwargs
) -> VespaResponse:
"""
@@ -837,12 +859,13 @@ def update_data(
:param fields: Dict containing all the fields you want to update.
:param create: If true, updates to non-existent documents will create an empty document to update
:param namespace: The namespace that we are updating data.
+ :param groupname: The groupname used to update data
:param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters)
:return: Response of the HTTP PUT request.
:raises HTTPError: if one occurred
"""
- path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
+ path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}?create={}".format(
self.app.end_point, path, str(create).lower()
)
@@ -914,8 +937,11 @@ async def _wait(f, args, **kwargs):
async def query(
self,
body: Optional[Dict] = None,
+ groupname:str = None,
**kwargs
) -> VespaQueryResponse:
+ if groupname:
+ kwargs["streaming.groupname"] = groupname
r = await self.aiohttp_session.post(self.app.search_end_point, json=body, params=kwargs)
return VespaQueryResponse(
json=await r.json(), status_code=r.status, url=str(r.url)
@@ -923,10 +949,10 @@ async def query(
@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def feed_data_point(
- self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs
+ self, schema: str, data_id: str, fields: Dict, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:
- path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
+ path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
@@ -941,9 +967,9 @@ async def feed_data_point(
@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def delete_data(
- self, schema: str, data_id: str, namespace: str = None, **kwargs
+ self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:
- path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
+ path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
@@ -957,9 +983,9 @@ async def delete_data(
@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def get_data(
- self, schema: str, data_id: str, namespace: str = None, **kwargs
+ self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:
- path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
+ path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
@@ -979,9 +1005,10 @@ async def update_data(
fields: Dict,
create: bool = False,
namespace: str = None,
+ groupname:str=None,
**kwargs
) -> VespaResponse:
- path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
+ path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}?create={}".format(
self.app.end_point, path, str(create).lower()
)
diff --git a/vespa/templates/services.xml b/vespa/templates/services.xml
index 9d274174..d94de2d0 100644
--- a/vespa/templates/services.xml
+++ b/vespa/templates/services.xml
@@ -11,6 +11,7 @@
{% if schemas %}
+
{% endif %}
{% if components %}
{% for component in components %}
@@ -36,13 +37,18 @@
1
+ {% set streaming_modes = namespace(total = 0)%}
{% for schema in schemas %}
{% if schema.global_document %}
{% else %}
{% endif %}
+ {% if schema.mode == "streaming" %}{% set streaming_modes.total = 1 + streaming_modes.total %}{% endif %}
{% endfor %}
+ {% if streaming_modes.total > 0 %}
+
+ {% endif %}
diff --git a/vespa/test_integration_docker.py b/vespa/test_integration_docker.py
index c1bee93d..27124f3b 100644
--- a/vespa/test_integration_docker.py
+++ b/vespa/test_integration_docker.py
@@ -808,3 +808,108 @@ def test_execute_async_data_operations(self):
def tearDown(self) -> None:
self.vespa_docker.container.stop(timeout=CONTAINER_STOP_TIMEOUT)
self.vespa_docker.container.remove()
+
+class TestStreamingApplication(unittest.TestCase):
+ def setUp(self) -> None:
+ document = Document(
+ fields=[
+ Field(name="id", type="string", indexing=["attribute", "summary"]),
+ Field(
+ name="title",
+ type="string",
+ indexing=["index", "summary"],
+ index="enable-bm25",
+ ),
+ Field(
+ name="body",
+ type="string",
+ indexing=["index", "summary"],
+ index="enable-bm25",
+ )
+ ]
+ )
+ mail_schema = Schema(
+ name="mail",
+ mode="streaming",
+ document=document,
+ fieldsets=[FieldSet(name="default", fields=["title", "body"])],
+ rank_profiles=[
+ RankProfile(name="default", first_phase="nativeRank(title, body)")
+ ])
+ self.app_package = ApplicationPackage(name="mail", schema=[mail_schema])
+
+ self.vespa_docker = VespaDocker(port=8089)
+ self.app = self.vespa_docker.deploy(application_package=self.app_package)
+
+ def test_streaming(self):
+ docs = [{
+ "id": 1,
+ "groupname": "a@hotmail.com",
+ "fields": {
+ "title": "this is a title",
+ "body": "this is a body"
+ }
+ },
+ {
+ "id": 1,
+ "groupname": "b@hotmail.com",
+ "fields": {
+ "title": "this is a title",
+ "body": "this is a body"
+ }
+ },
+ {
+ "id": 2,
+ "groupname": "b@hotmail.com",
+ "fields": {
+ "title": "this is another title",
+ "body": "this is another body"
+ }
+ }
+ ]
+ self.app.wait_for_application_up(300)
+
+ def callback(response:VespaResponse, id:str):
+ if not response.is_successfull():
+ print("Id " + id + " + failed : " + response.json)
+
+ self.app.feed_iterable(docs, schema="mail", namespace="test", callback=callback)
+ from vespa.io import VespaQueryResponse
+ response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'title'", groupname="a@hotmail.com")
+ self.assertTrue(response.is_successfull())
+ self.assertEqual(response.number_documents_retrieved, 1)
+
+ response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'title'", groupname="b@hotmail.com")
+ self.assertTrue(response.is_successfull())
+ self.assertEqual(response.number_documents_retrieved, 2)
+
+ with pytest.raises(Exception):
+ response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'title'")
+
+ self.app.delete_data(schema="mail", namespace="test", data_id=2, groupname="b@hotmail.com")
+
+ response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'title'", groupname="b@hotmail.com")
+ self.assertTrue(response.is_successfull())
+ self.assertEqual(response.number_documents_retrieved, 1)
+
+ self.app.update_data(schema="mail", namespace="test", data_id=1, groupname="b@hotmail.com", fields={"title": "this is a new foo"})
+ response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'foo'", groupname="b@hotmail.com")
+ self.assertTrue(response.is_successfull())
+ self.assertEqual(response.number_documents_retrieved, 1)
+
+ response = self.app.get_data(schema="mail", namespace="test", data_id=1, groupname="b@hotmail.com")
+ self.assertDictEqual(response.json,
+ {
+ "pathId": "/document/v1/test/mail/group/b@hotmail.com/1",
+ "id": "id:test:mail:g=b@hotmail.com:1",
+ "fields": {
+ "body": "this is a body",
+ "title": "this is a new foo"
+ }
+ }
+ )
+
+ def tearDown(self) -> None:
+ self.vespa_docker.container.stop(timeout=CONTAINER_STOP_TIMEOUT)
+ self.vespa_docker.container.remove()
+
diff --git a/vespa/test_package.py b/vespa/test_package.py
index cdb158a3..2c8fe3c8 100644
--- a/vespa/test_package.py
+++ b/vespa/test_package.py
@@ -688,6 +688,7 @@ def test_services_to_text(self):
' \n'
" \n"
" \n"
+ " \n"
" \n"
' \n'
' 1\n'
@@ -733,9 +734,29 @@ def setUp(self) -> None:
)
])
)
+ self.calendar = Schema(
+ name="calendar",
+ mode="streaming",
+ document=Document(
+ fields=[
+ Field(
+ name="title", type="string", indexing=["attribute", "summary"]
+ )
+ ])
+ )
+ self.event = Schema(
+ name="event",
+ mode="index",
+ document=Document(
+ fields=[
+ Field(
+ name="title", type="string", indexing=["attribute", "summary"]
+ )
+ ])
+ )
self.app_package = ApplicationPackage(
name="testapp",
- schema=[self.mail])
+ schema=[self.mail, self.calendar, self.event])
def test_generated_services_uses_mode_streaming(self):
expected_result = (
@@ -744,11 +765,15 @@ def test_generated_services_uses_mode_streaming(self):
' \n'
" \n"
" \n"
+ " \n"
" \n"
' \n'
' 1\n'
" \n"
' \n'
+ ' \n'
+ ' \n'
+ ' \n'
" \n"
" \n"
' \n'
@@ -873,6 +898,7 @@ def test_services_to_text(self):
' \n'
" \n"
" \n"
+ " \n"
" \n"
' \n'
' 1\n'
@@ -1021,6 +1047,7 @@ def test_services_to_text(self):
' \n'
" \n"
" \n"
+ " \n"
" \n"
' \n'
' 1\n'
@@ -1108,6 +1135,7 @@ def test_services_to_text(self):
' \n'
" \n"
" \n"
+ " \n"
" \n"
' \n'
' 1\n'
@@ -1150,6 +1178,7 @@ def test_services_to_text(self):
' \n'
" \n"
" \n"
+ " \n"
' \n'
' \n'
' \n'
@@ -1200,6 +1229,7 @@ def test_services_to_text(self):
' \n'
" \n"
" \n"
+ " \n"
' \n'
' \n'
' \n'