-
Notifications
You must be signed in to change notification settings - Fork 0
2. File parsing
This document provides a detailed overview of the second step of the deduplication service: parsing the file, detecting duplicates and reporting back to the user.
The code that handles this part can be found in the dedupe.DedupeTask
module.
As said previously, asynchronous tasks are tricky in the sense that it is difficult to see if the process is running smoothly. To avoid this as much as possible, this whole part is filled with error-catching functions. There is also a general method that parses the errors, prepares a detailed report of what happened and sends notifications to the user and the administrator. This is wrapped in the _err
method.
def _err(self, err_code=500, err_message="", err_explain=""):
"""Return a custom error message along with the error code."""
self.error(err_code)
resp = {
"status": "error",
"error": err_message,
"message": err_explain
}
logging.error(err_message)
logging.error(err_explain)
self.response.headers['Content-Type'] = "application/json"
self.response.write(json.dumps(resp)+"\n")
# Send email to admin with details
mail.send_mail(
sender=EMAIL_SENDER,
to=ADMIN,
subject="[VN-dedupe] Dedupe error",
body="""Hey,
Something went wrong with a deduplication request. Here are the details:
File UUID: %s
Date-time: %s
Error: %s
Error explain: %s
User: %s
Content-Type: %s
Action: %s
Duplicates: %s
""" % (self.request_namespace, datetime.now(), err_message, err_explain,
self.email, self.content_type, self.action, self.duplicates))
# Send email to user
msg = """Hello,
This is a notification email to inform you that something went wrong with the
de-duplication of your file. The application's administrators have been
notified. If you want to contact them directly, please send an email to %s or
visit the following webpage to submit an issue:
https://www.github.com/VertNet/dedupe/issues
Sorry for the inconvenience, and thanks for your understanding.
""" % ADMIN
self.send_email_notification(msg)
# Log the error
params = dict(
latlon=self.latlon, country=self.country, status="error",
user_agent=self.user_agent, warnings=self.warnings,
error=err_message, email=self.email, action=self.action,
duplicates=self.duplicates,
loc=self.headers[self.loc], sci=self.headers[self.sci],
dat=self.headers[self.dat], col=self.headers[self.col],
id_field=self.id_field, namespace=self.request_namespace,
content_type=self.content_type, file_size=self.file_size
)
taskqueue.add(
url='/service/v0/log',
payload=json.dumps(params),
queue_name=QUEUE_NAME
)
logging.info("Logging enqueued")
return
The last lines, starting from # Log the error
, refer to the activity logging mechanism.
Email sending is also a critical part of this process, so the code for doing this has been encapsulated in its own function. The code is wrapped in the send_email_notification
method.
NOTE: the first code line of this function has an identifier for a special case, "success". This is a set of standard values used when sending a "finished successfully" message.
def send_email_notification(self, msg):
"""Send email to user with success/failure notification."""
if msg == "success":
# Assign email subject
subject = EMAIL_SUCCESS_SUBJECT
# Build explanation note for email
if self.action == "flag":
action_description = ACTION_FLAG
elif self.action == "remove":
action_description = ACTION_REMOVE
# Create email body
body = EMAIL_SUCCESS_BODY.format(
self.file_url,
action_description,
json.dumps(self.report, sort_keys=True, indent=4))
else:
subject = EMAIL_ERROR_SUBJECT
body = msg
# Send email
sender = EMAIL_SENDER
to = self.email
mail.send_mail(sender=sender, to=to, subject=subject, body=body)
return
All uppercase variables refer to custom pieces of text stored in the config
module.
The overall process can be summarized as follows:
- Retrieve all parameters and switch to individual namespace
- Get file from Cloud Storage
- Initialize response
- Create response file
- Read file rows and check for duplicates
- Close file and prepare report for user
All the code for this part is wrapped in the post
method, unless otherwise stated.
All the parameters that were set up in the previous step must be collected from the request object:
# Initialize variables from request
self.latlon = self.request.get("latlon", None)
self.country = self.request.get("country", None)
self.user_agent = self.request.get("user_agent", None)
self.email = self.request.get("email", None)
self.request_namespace = self.request.get("request_namespace", None)
self.request_namespace = str(self.request_namespace)
self.previous_namespace = self.request.get("previous_namespace", None)
self.content_type = self.request.get("content_type", None)
self.delimiter = str(self.request.get("delimiter", None))
self.extension = self.request.get("extension", None)
self.action = self.request.get("action", None)
self.duplicates = self.request.get("duplicates", None)
self.file_path = str(self.request.get("file_path", None))
self.file_name = str(self.request.get("file_name", None))
self.headers = json.loads(self.request.get("headers", None))
self.headers_lower = [x.lower() for x in self.headers]
self.loc = int(self.request.get("loc", None))
self.sci = int(self.request.get("sci", None))
self.dat = int(self.request.get("dat", None))
self.col = int(self.request.get("col", None))
self.id_field = self.request.get("id_field", None)
When the self.duplicates
variable is set to all
, a simple transformation will create a list with all the actual available duplicate types:
# Transform "all" in list of elements for duplicate types
if self.duplicates == "all":
self.duplicates = [x for x in ALLOWED_DUPLICATES if x is not "all"]
Then, the namespace is changed:
# Switch to request namespace
namespace_manager.set_namespace(self.request_namespace)
logging.info("Switched to namespace %s" % self.request_namespace)
The previous part dealt with storing the file in Google Cloud Storage, so now it is time to grab it back. But first, for the sake of logging, the file size is extracted:
# Store file size for logging
self.file_size = gcs.stat(self.file_name).st_size
logging.info("File size: %s" % self.file_size)
And then, the actual file is grabbed:
# Get file from GCS
try:
self.file = gcs.open(self.file_name)
except Exception, e:
self._err(500, "Could not open uploaded file", e)
return
self.reader = csv.reader(self.file, delimiter=self.delimiter)
The last line creates a CSV
reader object from the file. This object will be in charge of iterating through the list of records.
A bunch of things must be prepared before actually parsing the file.
First, a placeholder for the warnings is created as an empty list
# Initialize warnings
self.warnings = []
Then, some counts for the final report:
# Initialize report values
self.records = 0
self.strict_duplicates = 0
self.duplicate_order = set()
self.partial_duplicates = 0
self.partial_duplicates_order = set()
Finally, the id
field (if any) must be located and its position (index) stored for future reference:
# Calculating "id" field position, if exists
if self.id_field is not None:
self.idx = self.headers_lower.index(self.id_field.lower())
logging.info("Using %s as 'id' field" % self.id_field)
logging.info("'id' field in position %s" % self.idx)
self.duplicate_ids = set()
self.partial_duplicate_ids = set()
The response file (the file that will be delivered to the user) will be, depending on the selected action
, either a copy of the original file in which some new fields have been created (to identify rows as duplicates) or a subset of the original file with all duplicates removed. In case of a report
action, no file will be created.
In any case, the response file has to be created as a new file in Google Cloud Storage, in the same bucket (named as the namespace identifier):
# Create response file in GCS
if self.action != "report":
self.file_name = "/".join(["", BUCKET, self.request_namespace])
self.file_name = "%s/modif.%s" % (self.file_path, self.extension)
# Open file
try:
self.f = gcs.open(self.file_name, 'w',
content_type=self.content_type)
logging.info("Created GCS file in %s" % self.file_name)
except Exception, e:
self._err(500, "Could not open result file", e)
Then, the field names (original headers) must be added. In case of a flag
action, three additional fields are appended:
-
isDuplicate
: boolean; tells if the record is a duplicate or not. -
duplicateType
: codified list of duplicate type (no duplicate, partial duplicate or strict duplicate, so far). The actual list can be found in theconfig
module. -
duplicateOf
: a list of all the ids of which this record is a duplicate.
For more information on the duplicate type, please check the wiki page for Action types.
# Write headers
if self.action == "flag":
self.headers += ["isDuplicate", "duplicateType", "duplicateOf"]
try:
self.f.write(str(self.delimiter.join(self.headers)))
self.f.write("\n")
logging.info("Successfully wrote headers in file")
except Exception, e:
self._err(500, "Could not write headers in result file", e)
Now, for the meaty part, the CSV
iterates over all rows, applying the following three functions: check_strict_dupes
, check_partial_dupes
and handle_row
.
First, it prepares the basic information for the row:
for row in self.reader:
self.records += 1
self.is_dupe = NO_DUPE
self.dupe_ref = None
... adding 1 to the count of records and establishing "FALSE" values for is_dupe
and dupe_ref
.
After this, it will check first if the record is a strict duplicate (if requested). If it turns to be a strict duplicate, there is no point in checking it further. Only if the record is not a strict dupicate (and, of course, if partial duplicates are to be found) will it apply the second function.
The last function, handle_row
will perform certain common operations on the row, depending on the result of the checks.
Only if strict
is in the list of duplicates to be searched (i.e., when self.duplicates
is either strict
or all
). The code handling this function is wrapped in the check_strict_dupes
function.
Strict duplicates are checked based on their hash checksums. This was found to be the most efficient way of detecting information duplication. Since we are looking for strict duplicates, the hash is created for the whole row.
# Calculate md5 hash
k = hashlib.md5(str(row)).hexdigest()
Then, the hash is checked against the list of hashes so far stored in memcache (in the specific namespace)
# Check if hash exists in memcache
dupe = memcache.get(k, namespace=self.request_namespace)
Memcache was also found to be the most efficient way of rapidly storing and retrieving small chunks of data.
Now, the rest of the function depends on whether or not there was a match.
If there was a match, the rest of the function deals with codifying the variables and preparing the response:
if dupe is not None:
self.is_dupe = STRICT_DUPE
self.strict_duplicates += 1
self.dupe_ref = dupe
self.duplicate_order.add((dupe, self.records))
if self.id_field is not None:
self.duplicate_ids.add(row[self.idx])
return 1
Otherwise, simply store the hash in the memcache for further reference:
else:
memcache.set(k, self.records, namespace=self.request_namespace)
return 0
Only if partial
is in the list of duplicates to be searched (i.e., when self.duplicates
is either partial
or all
). The code handling this function is wrapped in the check_partial_dupes
function.
Partial duplicates are not checked based on their hash checksums. Since they are based on such a small set of fields, tests have shown that the concatenation of the values of these fields can be used as key for the matching step. So the first thing is to create a string with these values:
pk = "|".join([row[self.loc], row[self.sci], row[self.col], row[self.dat]])
And then check for duplicates in memcache:
pdupe = memcache.get(pk, namespace=self.request_namespace)
After this, the processing is the same as with strict duplicates.
# If exists, PARTIAL_DUPE
if pdupe is not None:
self.is_dupe = PARTIAL_DUPE
self.partial_duplicates += 1
self.dupe_ref = pdupe
self.partial_duplicates_order.add((pdupe, self.records))
if self.id_field is not None:
self.partial_duplicate_ids.add(row[self.idx])
return 1
# Otherwise, store key in memcache
else:
memcache.set(pk, self.records,
namespace=self.request_namespace)
return 0
This step, wrapped in the handle_row
function, occurs with every row. It basically creates a (modified or not) copy of the original row and writes it to the response file.
First, if the action
is report
or if the action
is remove
and the record is a duplicate, don't do anything (the record will not be written in the file):
if (self.action == "remove" and self.is_dupe != NO_DUPE) or self.action == "report":
pass
If the action
is flag
or if the action
is remove
and the row is not a duplicate, then write the row in the new file. Besides, if the action is flag
, add the three new fields at the end of the row:
else:
# If action is flag, add three flag fields to row
if self.action == "flag":
row.append(bool(self.is_dupe))
row.append(self.is_dupe if self.is_dupe > 0 else None)
row.append(self.dupe_ref)
Now, to handle proper conversion of list to CSV writer, there is a small workaround that is needed: first, an empty StringIO
object has to be created, which is then used to create the csv.writer
; this writer
writes the new row
in the object itself but not in the response file; the last step is to write the value of the writer
(si.getvalue()
) in the response file.
try: # Workaround to handle proper conversion
si = StringIO()
cw = csv.writer(si, delimiter=self.delimiter)
cw.writerow(row)
self.f.write(si.getvalue())
except Exception, e:
logging.warning("Something went wrong writing a row\n"
"f: %s\nrow: %s\nerror: %s" %
(self.file_name, row, e))
self.warnings.append("Could not write record %s in new file" %
self.records)
Unless the action is report
(in which case, there is no response file), the response file needs to be closed:
# Close file when finished parsing records
if self.action != "report":
try:
self.f.close()
self.file_url = "https://storage.googleapis.com%s" %\
self.file_name
logging.info("Successfully created file %s" % self.file_name)
except Exception, e:
self._err(500, "Could not close result file", e)
And thus the response file is ready. Now all that is left is to prepare a response to the user.
NOTE: at this point, the extensive logging of results is incomplete. The user will only receive a generic "success" email with links to the dedupe'd file and a long, unformatted JSON document.
# Send notification to user
self.send_email_notification("success")
And, to avoid conflicts, return to the default namespace:
# Return to default namespace
namespace_manager.set_namespace(self.previous_namespace)
All usage of this tool is monitored in VertNet usage logs. A new entry will be created with data on the request. A Datastore model of a log entry can be found in the models
module, and the code for transforming usage information into LogEntry
entities and storing it in the Datastore can be found in the DedupeLog
module.
# Add entry to log
params = dict(
latlon=self.latlon, country=self.country, status="success",
user_agent=self.user_agent, warnings=self.warnings, error=None,
email=self.email, action=self.action, duplicates=self.duplicates,
loc=self.headers[self.loc], sci=self.headers[self.sci],
dat=self.headers[self.dat], col=self.headers[self.col],
id_field=self.id_field, namespace=self.request_namespace,
content_type=self.content_type, file_size=self.file_size,
records=self.records, fields=len(self.headers),
strict_duplicates=self.strict_duplicates, api_version=API_VERSION,
partial_duplicates=self.partial_duplicates
)
taskqueue.add(
url='/service/v0/log',
payload=json.dumps(params),
queue_name=QUEUE_NAME
)
logging.info("Logging enqueued")
This repository is part of the VertNet project.
For more information, please check out the project's home page and GitHub organization page