Sunday, January 26, 2014

What does it take to improve batch process performance 4,500 times?

The subject is a bit untypical for this blog, but I've been asked multiple times by colleagues about details of this project, and so here it is for others to comment and get ideas.

Our team recently showed how to achieve 4,500 times performance improvement in a key component of our legacy batch processing system. The component is responsible for classifying records, and it was under a constant strain. The component had a messaging subsystem and a deduplication strategy which assured that only records which changed that day were reclassified. Over the years, the classification data drifted out of sync, but it was impossible to reclassify all records, because the system was only able to process a day’s worth of changes, which was about 2 million records. Additionally, any change to rules which would result in updating a significant portion of the records, had to be carefully planned and throttled. The new system we put in place does not concern itself with daily changeset. It is able to reclassify the entire dataset of 30 million records in 5 minutes.

To achieve this, we focused on understanding the computation that was taking place. Our classifier is essentially doing a map-reduce operation, with about 7 input records with the same key field producing one output record. There are approximately 30 million input records. The system is I/O bound, with reduce step taking little computational power. Knowing these facts gave us a pause, because there is nothing daunting about processing 30 million records, and yet our system was unable to cope with a 10th of it. At this scale, we did not feel it necessary to introduce any special technology, and stayed within our Oracle relational database.

Efficient mapping was accomplished by keeping an index on a key field, and using order by operator in the select statement. This allowed us to start processing records before the query completed. Additionally, we noticed that characters in certain positions of the key were well distributed, and so we used underscore operator in our select statement and multithreaded the job. The select statement for each thread looks like this:

select * from table where key like ‘___X____’ order by key
where X is a different character for each thread, and keys are distributed evenly. The source code reads one record at a time from the result set, and when key changes between the records, it outputs the classification for all records for the previous key. See the partial pseudo-code below.
recordsToClassify = new List()
currentKey = ‘NOT_INITIALIZED’
while (results.next()) {
  currentRecord = readRecord(results)
  if (currentRecord.key != currentKey) {
    classify(recordsToClassify)
    recordsToClassify.clear
  }
  currentKey = currentRecord.key
  recordsToClassify.add(currentRecord)
}
classify(recordsToClassify)

Finally, on the output end of things, since we did not have to deal with adds and drops, we could truncate the classification table before writing data to it.

It is difficult to gauge how each part of this solution contributes to the performance improvement, since new and old system are so different. However, the important performance differentiators are as follows:

  • Batch processing. Asking the database to retrieve all records at once is a lot more efficient than running millions of queries for a small subset of records
  • Processing while the query was running. Under the hood, the database retrieves records in batches, and ResultSet.next() is JDBC’s abstraction that allows to read records as they become available, without waiting for the query to complete. In addition to performance improvement opportunities, this also assures that the system does not run out of memory while reading a large dataset. Plain JDBC seems to be the tool of choice here, as just about every framework makes getting at this functionality difficult, if not impossible.
  • No distractions: logging, deduplication of changes. A system that does a complete resync in 5 minutes does not need careful bookkeeping.
  • Writes are much faster than updates. New system outputs to an empty table, while the old one needed to make changes to existing records.
  • Recognizing opportunity for distributed processing. Our map-reduce job was partitioned by key, and selecting keys on a specific character through the ‘like’ clause with underscores is very efficient. Since keys were already indexed, the query just needs to walk through the index and retrieve records as they appear in the index.

One final point to ponder is how simple the end result ended up. Having been there at the front lines, I can attest that achieving this simplicity wasn’t easy. Even after we thought we understood what the old system was doing, we iterated over several prototypes. Each of the prototypes was simpler than its predecessor, and each time, it seemed we have reached the limits of optimization. While many optimizations trade complexity for performance, this is an example of an optimization achieved mainly through reduction of complexity.

No comments:

Post a Comment