-
Notifications
You must be signed in to change notification settings - Fork 111
/
feed_to_vespa.py
executable file
·205 lines (158 loc) · 7.34 KB
/
feed_to_vespa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#!/usr/bin/env python3
# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
import json
import os
import re
import subprocess
import sys
import yaml
import requests
from requests.adapters import HTTPAdapter, Retry
import urllib.parse
def find(json, path, separator = "."):
if len(path) == 0: return json
head, _, rest = path.partition(separator)
return find(json[head], rest) if head in json else None
# extract <id> from form id:open:doc::<id>
def get_document_id(id):
return id[id.rfind(":")+1:]
def get_private_key_path():
private_key_path = "data-plane-private-key.pem"
if not os.path.isfile(private_key_path):
private_key_raw = os.environ['DATA_PLANE_PRIVATE_KEY']
private_key = private_key_raw.replace(" ", "\n")
with open(private_key_path, "w") as f:
f.write("-----BEGIN PRIVATE KEY-----\n" + private_key + "\n-----END PRIVATE KEY-----")
return private_key_path
def get_public_cert_path():
public_cert_path = "data-plane-public-key.pem"
if not os.path.isfile(public_cert_path):
public_cert_raw = os.environ['DATA_PLANE_PUBLIC_KEY']
public_cert = public_cert_raw.replace(" ", "\n")
with open(public_cert_path, "w") as f:
f.write("-----BEGIN CERTIFICATE-----\n" + public_cert + "\n-----END CERTIFICATE-----")
return public_cert_path
def vespa_get(endpoint, operation, options):
url = "{0}/{1}?{2}".format(endpoint, operation, "&".join(options))
return session.get(url).json()
def vespa_delete(endpoint, operation, options):
url = "{0}/{1}?{2}".format(endpoint, operation, "&".join(options))
return session.delete(url).json()
def vespa_visit(endpoint, namespace, doc_type, continuation = None):
options = []
options.append("wantedDocumentCount=500")
options.append("timeout=60s")
if continuation is not None and len(continuation) > 0:
options.append("&continuation={0}".format(continuation))
return vespa_get(endpoint, "document/v1/{0}/{1}/docid".format(namespace,doc_type), options)
def vespa_remove(endpoint, doc_ids, namespace, doc_type):
options = []
for doc_id in doc_ids:
id = get_document_id(doc_id)
vespa_delete(endpoint, "document/v1/{0}/{1}/docid/{2}".format(namespace, doc_type, id), options)
def vespa_feed(endpoint, feed, namespace, doc_type):
if doc_type not in ["paragraph", "term", "doc"]:
raise ValueError(":error:Unknown vespa doc_type: {0}".format(doc_type))
splits = re.split(r'/|\.', endpoint)
app_string = splits[3] + '.' + splits[2]
print("Feeding to app: {0} , endpoint: {1}".format(app_string, endpoint))
process = subprocess.run(['vespa', 'feed', '-a', app_string, '-t', endpoint, feed], capture_output=True)
# Print sderr if not empty
if process.stderr:
print("::group::VespaCLI-Error")
print("::error::Errors reported by VespaCLI:")
print(process.stderr.decode('utf-8'))
print("::endgroup::")
if process.returncode != 0:
print("::error::Errors encountered while feeding Vespa application.")
sys.exit(process.returncode)
return process.stdout.decode('utf-8')
def get_docs(index):
file = open(index, "r", encoding='utf-8')
return json.load(file)
def get_indexed_docids(endpoint, namespace, doc_type):
docids = set()
continuation = ""
while continuation is not None:
json = vespa_visit(endpoint, namespace, doc_type, continuation)
documents = find(json, "documents")
if documents is not None:
ids = [ find(document, "id") for document in documents ]
for id in ids:
# The document id might contain chars that needs to be escaped for the delete/put operation to work
# also for comparison with what is in the feed
docid = get_document_id(id) # return the last part
encoded = urllib.parse.quote(docid) #escape
id = id.replace(docid, encoded)
docids.add(id)
continuation = find(json, "continuation")
return docids
def get_feed_docids(feed, namespace, doc_type):
with open(feed, "r", encoding='utf-8') as f:
feed_json = json.load(f)
if doc_type == "doc":
return set(["id:{0}:doc::".format(namespace) + find(doc, "fields.namespace") + find(doc, "fields.path") for doc in feed_json])
elif doc_type == "term":
return set(["id:{0}:term::".format(namespace) + str(find(doc, "fields.hash")) for doc in feed_json])
elif doc_type == "paragraph":
return set([doc['put'] for doc in feed_json])
def print_header(msg):
print("")
print("*" * 80)
print("* {0}".format(msg))
print("*" * 80)
def read_config(config_file):
with open(config_file, "r") as f:
return yaml.safe_load(f)
def update_endpoint(endpoint, config):
do_remove_index = config["search"]["do_index_removal_before_feed"]
do_feed = config["search"]["do_feed"]
namespace = config["search"]["namespace"]
doc_type = config["search"]["doc_type"]
endpoint_url = endpoint["url"]
endpoint_url = endpoint_url[:-1] if endpoint_url.endswith("/") else endpoint_url
endpoint_indexes = endpoint["indexes"]
print_header("Retrieving already indexed document ids for endpoint {0}".format(endpoint_url))
docids_in_index = get_indexed_docids(endpoint_url, namespace, doc_type)
print("{0} documents found.".format(len(docids_in_index)))
if do_remove_index:
print_header("Removing all indexed documents in {0}".format(endpoint_url))
vespa_remove(endpoint_url, docids_in_index, namespace, doc_type)
print("{0} documents removed.".format(len(docids_in_index)))
if do_feed:
docids_in_feed = set()
print_header("Parsing feed file(s) for document ids")
for index in endpoint_indexes:
assert os.path.exists(index)
docids_in_feed = docids_in_feed.union(get_feed_docids(index, namespace, doc_type))
print("{0} documents found.".format(len(docids_in_feed)))
if len(docids_in_feed) == 0:
return
docids_to_remove = docids_in_index.difference(docids_in_feed)
if len(docids_to_remove) > 0:
print_header("Removing indexed documents not in feed in {0}".format(endpoint_url))
for id in docids_to_remove:
print("To Remove: {0}".format(id))
vespa_remove(endpoint_url, docids_to_remove, namespace, doc_type)
print("{0} documents removed.".format(len(docids_to_remove)))
else:
print("No documents to be removed.")
for index in endpoint_indexes:
print_header("Feeding {0} to {1}...".format(index, endpoint_url))
print(vespa_feed(endpoint_url, index, namespace, doc_type))
print("{0} documents fed.".format(len(docids_in_feed)))
def main():
configuration_file = sys.argv[1]
config = read_config(configuration_file)
global session
session = requests.Session()
retries = Retry(total=10, connect=10,
backoff_factor=0.8,
status_forcelist=[ 500, 503, 504, 429 ]
)
session.mount('https://', HTTPAdapter(max_retries=retries))
session.cert = (get_public_cert_path(), get_private_key_path())
for endpoint in config["search"]["feed_endpoints"]:
update_endpoint(endpoint, config)
if __name__ == "__main__":
main()