Skip to content

2. File parsing

Javier Otegui edited this page Dec 28, 2016 · 1 revision

Deduplication II - File parsing

  1. Introduction
  2. Custom errors
  3. Sending email notifications
  4. General workflow
    1. Retrieve all parameters and switch namespace
    2. Get file from Cloud Storage
    3. Initialize response
    4. Create response file
    5. Read file rows and check for duplicates
      1. Checking strict duplicates
      2. Checking partial duplicates
      3. Handling the new row
    6. Close file and prepare report for user
  5. Logging the usage

Introduction

This document provides a detailed overview of the second step of the deduplication service: parsing the file, detecting duplicates and reporting back to the user.

The code that handles this part can be found in the dedupe.DedupeTask module.

Custom errors

As said previously, asynchronous tasks are tricky in the sense that it is difficult to see if the process is running smoothly. To avoid this as much as possible, this whole part is filled with error-catching functions. There is also a general method that parses the errors, prepares a detailed report of what happened and sends notifications to the user and the administrator. This is wrapped in the _err method.

    def _err(self, err_code=500, err_message="", err_explain=""):
        """Return a custom error message along with the error code."""
        self.error(err_code)
        resp = {
            "status": "error",
            "error": err_message,
            "message": err_explain
        }
        logging.error(err_message)
        logging.error(err_explain)
        self.response.headers['Content-Type'] = "application/json"
        self.response.write(json.dumps(resp)+"\n")

        # Send email to admin with details
        mail.send_mail(
            sender=EMAIL_SENDER,
            to=ADMIN,
            subject="[VN-dedupe] Dedupe error",
            body="""Hey,

Something went wrong with a deduplication request. Here are the details:

File UUID:     %s
Date-time:     %s
Error:         %s
Error explain: %s
User:          %s
Content-Type:  %s
Action:        %s
Duplicates:    %s

""" % (self.request_namespace, datetime.now(), err_message, err_explain,
                self.email, self.content_type, self.action, self.duplicates))

        # Send email to user
        msg = """Hello,

This is a notification email to inform you that something went wrong with the
de-duplication of your file. The application's administrators have been
notified. If you want to contact them directly, please send an email to %s or
visit the following webpage to submit an issue:

https://www.github.com/VertNet/dedupe/issues

Sorry for the inconvenience, and thanks for your understanding.
""" % ADMIN
        self.send_email_notification(msg)

        # Log the error
        params = dict(
            latlon=self.latlon, country=self.country, status="error",
            user_agent=self.user_agent, warnings=self.warnings,
            error=err_message, email=self.email, action=self.action,
            duplicates=self.duplicates,
            loc=self.headers[self.loc], sci=self.headers[self.sci],
            dat=self.headers[self.dat], col=self.headers[self.col],
            id_field=self.id_field, namespace=self.request_namespace,
            content_type=self.content_type, file_size=self.file_size
        )
        taskqueue.add(
            url='/service/v0/log',
            payload=json.dumps(params),
            queue_name=QUEUE_NAME
        )
        logging.info("Logging enqueued")
        return

The last lines, starting from # Log the error, refer to the activity logging mechanism.

Sending email notifications

Email sending is also a critical part of this process, so the code for doing this has been encapsulated in its own function. The code is wrapped in the send_email_notification method.

NOTE: the first code line of this function has an identifier for a special case, "success". This is a set of standard values used when sending a "finished successfully" message.

    def send_email_notification(self, msg):
        """Send email to user with success/failure notification."""

        if msg == "success":

            # Assign email subject
            subject = EMAIL_SUCCESS_SUBJECT

            # Build explanation note for email
            if self.action == "flag":
                action_description = ACTION_FLAG
            elif self.action == "remove":
                action_description = ACTION_REMOVE

            # Create email body
            body = EMAIL_SUCCESS_BODY.format(
                self.file_url,
                action_description,
                json.dumps(self.report, sort_keys=True, indent=4))

        else:
            subject = EMAIL_ERROR_SUBJECT
            body = msg

        # Send email
        sender = EMAIL_SENDER
        to = self.email
        mail.send_mail(sender=sender, to=to, subject=subject, body=body)

        return

All uppercase variables refer to custom pieces of text stored in the config module.

General workflow

The overall process can be summarized as follows:

  1. Retrieve all parameters and switch to individual namespace
  2. Get file from Cloud Storage
  3. Initialize response
  4. Create response file
  5. Read file rows and check for duplicates
  6. Close file and prepare report for user

All the code for this part is wrapped in the post method, unless otherwise stated.

Retrieve all parameters and switch namespace

All the parameters that were set up in the previous step must be collected from the request object:

        # Initialize variables from request
        self.latlon = self.request.get("latlon", None)
        self.country = self.request.get("country", None)
        self.user_agent = self.request.get("user_agent", None)
        self.email = self.request.get("email", None)
        self.request_namespace = self.request.get("request_namespace", None)
        self.request_namespace = str(self.request_namespace)
        self.previous_namespace = self.request.get("previous_namespace", None)
        self.content_type = self.request.get("content_type", None)
        self.delimiter = str(self.request.get("delimiter", None))
        self.extension = self.request.get("extension", None)
        self.action = self.request.get("action", None)
        self.duplicates = self.request.get("duplicates", None)
        self.file_path = str(self.request.get("file_path", None))
        self.file_name = str(self.request.get("file_name", None))
        self.headers = json.loads(self.request.get("headers", None))
        self.headers_lower = [x.lower() for x in self.headers]
        self.loc = int(self.request.get("loc", None))
        self.sci = int(self.request.get("sci", None))
        self.dat = int(self.request.get("dat", None))
        self.col = int(self.request.get("col", None))
        self.id_field = self.request.get("id_field", None)

When the self.duplicates variable is set to all, a simple transformation will create a list with all the actual available duplicate types:

        # Transform "all" in list of elements for duplicate types
        if self.duplicates == "all":
            self.duplicates = [x for x in ALLOWED_DUPLICATES if x is not "all"]

Then, the namespace is changed:

        # Switch to request namespace
        namespace_manager.set_namespace(self.request_namespace)
        logging.info("Switched to namespace %s" % self.request_namespace)

Get file from Cloud Storage

The previous part dealt with storing the file in Google Cloud Storage, so now it is time to grab it back. But first, for the sake of logging, the file size is extracted:

        # Store file size for logging
        self.file_size = gcs.stat(self.file_name).st_size
        logging.info("File size: %s" % self.file_size)

And then, the actual file is grabbed:

        # Get file from GCS
        try:
            self.file = gcs.open(self.file_name)
        except Exception, e:
            self._err(500, "Could not open uploaded file", e)
            return
        self.reader = csv.reader(self.file, delimiter=self.delimiter)

The last line creates a CSV reader object from the file. This object will be in charge of iterating through the list of records.

Initialize response

A bunch of things must be prepared before actually parsing the file.

First, a placeholder for the warnings is created as an empty list

        # Initialize warnings
        self.warnings = []

Then, some counts for the final report:

        # Initialize report values
        self.records = 0
        self.strict_duplicates = 0
        self.duplicate_order = set()
        self.partial_duplicates = 0
        self.partial_duplicates_order = set()

Finally, the id field (if any) must be located and its position (index) stored for future reference:

        # Calculating "id" field position, if exists
        if self.id_field is not None:
            self.idx = self.headers_lower.index(self.id_field.lower())
            logging.info("Using %s as 'id' field" % self.id_field)
            logging.info("'id' field in position %s" % self.idx)
            self.duplicate_ids = set()
            self.partial_duplicate_ids = set()

Create response file

The response file (the file that will be delivered to the user) will be, depending on the selected action, either a copy of the original file in which some new fields have been created (to identify rows as duplicates) or a subset of the original file with all duplicates removed. In case of a report action, no file will be created.

In any case, the response file has to be created as a new file in Google Cloud Storage, in the same bucket (named as the namespace identifier):

        # Create response file in GCS
        if self.action != "report":
            self.file_name = "/".join(["", BUCKET, self.request_namespace])
            self.file_name = "%s/modif.%s" % (self.file_path, self.extension)

            # Open file
            try:
                self.f = gcs.open(self.file_name, 'w',
                                  content_type=self.content_type)
                logging.info("Created GCS file in %s" % self.file_name)
            except Exception, e:
                self._err(500, "Could not open result file", e)

Then, the field names (original headers) must be added. In case of a flag action, three additional fields are appended:

  • isDuplicate: boolean; tells if the record is a duplicate or not.
  • duplicateType: codified list of duplicate type (no duplicate, partial duplicate or strict duplicate, so far). The actual list can be found in the config module.
  • duplicateOf: a list of all the ids of which this record is a duplicate.

For more information on the duplicate type, please check the wiki page for Action types.

            # Write headers
            if self.action == "flag":
                self.headers += ["isDuplicate", "duplicateType", "duplicateOf"]
            try:
                self.f.write(str(self.delimiter.join(self.headers)))
                self.f.write("\n")
                logging.info("Successfully wrote headers in file")
            except Exception, e:
                self._err(500, "Could not write headers in result file", e)

Read file rows and check for duplicates

Now, for the meaty part, the CSV iterates over all rows, applying the following three functions: check_strict_dupes, check_partial_dupes and handle_row.

First, it prepares the basic information for the row:

        for row in self.reader:
            self.records += 1
            self.is_dupe = NO_DUPE
            self.dupe_ref = None

... adding 1 to the count of records and establishing "FALSE" values for is_dupe and dupe_ref.

After this, it will check first if the record is a strict duplicate (if requested). If it turns to be a strict duplicate, there is no point in checking it further. Only if the record is not a strict dupicate (and, of course, if partial duplicates are to be found) will it apply the second function.

The last function, handle_row will perform certain common operations on the row, depending on the result of the checks.

Checking strict duplicates

Only if strict is in the list of duplicates to be searched (i.e., when self.duplicates is either strict or all). The code handling this function is wrapped in the check_strict_dupes function.

Strict duplicates are checked based on their hash checksums. This was found to be the most efficient way of detecting information duplication. Since we are looking for strict duplicates, the hash is created for the whole row.

        # Calculate md5 hash
        k = hashlib.md5(str(row)).hexdigest()

Then, the hash is checked against the list of hashes so far stored in memcache (in the specific namespace)

        # Check if hash exists in memcache
        dupe = memcache.get(k, namespace=self.request_namespace)

Memcache was also found to be the most efficient way of rapidly storing and retrieving small chunks of data.

Now, the rest of the function depends on whether or not there was a match.

If there was a match, the rest of the function deals with codifying the variables and preparing the response:

        if dupe is not None:
            self.is_dupe = STRICT_DUPE
            self.strict_duplicates += 1
            self.dupe_ref = dupe
            self.duplicate_order.add((dupe, self.records))
            if self.id_field is not None:
                self.duplicate_ids.add(row[self.idx])
            return 1

Otherwise, simply store the hash in the memcache for further reference:

        else:
            memcache.set(k, self.records, namespace=self.request_namespace)
            return 0

Checking partial duplicates

Only if partial is in the list of duplicates to be searched (i.e., when self.duplicates is either partial or all). The code handling this function is wrapped in the check_partial_dupes function.

Partial duplicates are not checked based on their hash checksums. Since they are based on such a small set of fields, tests have shown that the concatenation of the values of these fields can be used as key for the matching step. So the first thing is to create a string with these values:

        pk = "|".join([row[self.loc], row[self.sci], row[self.col], row[self.dat]])

And then check for duplicates in memcache:

        pdupe = memcache.get(pk, namespace=self.request_namespace)

After this, the processing is the same as with strict duplicates.

        # If exists, PARTIAL_DUPE
        if pdupe is not None:
            self.is_dupe = PARTIAL_DUPE
            self.partial_duplicates += 1
            self.dupe_ref = pdupe
            self.partial_duplicates_order.add((pdupe, self.records))
            if self.id_field is not None:
                self.partial_duplicate_ids.add(row[self.idx])
            return 1
        # Otherwise, store key in memcache
        else:
            memcache.set(pk, self.records,
                         namespace=self.request_namespace)
            return 0

Handling the new row

This step, wrapped in the handle_row function, occurs with every row. It basically creates a (modified or not) copy of the original row and writes it to the response file.

First, if the action is report or if the action is remove and the record is a duplicate, don't do anything (the record will not be written in the file):

        if (self.action == "remove" and self.is_dupe != NO_DUPE) or self.action == "report":
            pass

If the action is flag or if the action is remove and the row is not a duplicate, then write the row in the new file. Besides, if the action is flag, add the three new fields at the end of the row:

        else:
            # If action is flag, add three flag fields to row
            if self.action == "flag":
                row.append(bool(self.is_dupe))
                row.append(self.is_dupe if self.is_dupe > 0 else None)
                row.append(self.dupe_ref)

Now, to handle proper conversion of list to CSV writer, there is a small workaround that is needed: first, an empty StringIO object has to be created, which is then used to create the csv.writer; this writer writes the new row in the object itself but not in the response file; the last step is to write the value of the writer (si.getvalue()) in the response file.

            try:  # Workaround to handle proper conversion
                si = StringIO()
                cw = csv.writer(si, delimiter=self.delimiter)
                cw.writerow(row)
                self.f.write(si.getvalue())
            except Exception, e:
                logging.warning("Something went wrong writing a row\n"
                                "f: %s\nrow: %s\nerror: %s" %
                                (self.file_name, row, e))
                self.warnings.append("Could not write record %s in new file" %
                                     self.records)

Close file and prepare report for user

Unless the action is report (in which case, there is no response file), the response file needs to be closed:

        # Close file when finished parsing records
        if self.action != "report":
            try:
                self.f.close()
                self.file_url = "https://storage.googleapis.com%s" %\
                                self.file_name
                logging.info("Successfully created file %s" % self.file_name)
            except Exception, e:
                self._err(500, "Could not close result file", e)

And thus the response file is ready. Now all that is left is to prepare a response to the user.

NOTE: at this point, the extensive logging of results is incomplete. The user will only receive a generic "success" email with links to the dedupe'd file and a long, unformatted JSON document.

        # Send notification to user
        self.send_email_notification("success")

And, to avoid conflicts, return to the default namespace:

        # Return to default namespace
        namespace_manager.set_namespace(self.previous_namespace)

Logging the usage

All usage of this tool is monitored in VertNet usage logs. A new entry will be created with data on the request. A Datastore model of a log entry can be found in the models module, and the code for transforming usage information into LogEntry entities and storing it in the Datastore can be found in the DedupeLog module.

        # Add entry to log
        params = dict(
            latlon=self.latlon, country=self.country, status="success",
            user_agent=self.user_agent, warnings=self.warnings, error=None,
            email=self.email, action=self.action, duplicates=self.duplicates,
            loc=self.headers[self.loc], sci=self.headers[self.sci],
            dat=self.headers[self.dat], col=self.headers[self.col],
            id_field=self.id_field, namespace=self.request_namespace,
            content_type=self.content_type, file_size=self.file_size,
            records=self.records, fields=len(self.headers),
            strict_duplicates=self.strict_duplicates, api_version=API_VERSION,
            partial_duplicates=self.partial_duplicates
        )
        taskqueue.add(
            url='/service/v0/log',
            payload=json.dumps(params),
            queue_name=QUEUE_NAME
        )
        logging.info("Logging enqueued")