High Throughput Iteration of Relational Databases
The use of relational databases as a primary data/metadata storage is quite ubiquitous in today’s distributed world. At Box we use MySQL to represent the hierarchy of the file system by using relational semantics of the database (example shown in the below diagram). This means in order to retrieve all files belonging to an enterprise or user, one has to iterate the hierarchy of the database. This scenario of retrieving child records of a given parent record is a common scenario and we believe other engineering teams with akin architecture would have also faced similar problems and with this blog we will share how we solved it.
At Box there are few scenarios which require retrieval of enterprise/user content.
- When a customer onboards Box KeySafe we have to perform backfill on customer content to ensure that DEKs (Data Encryption Key) are re-encrypted with the new customer key
- In Box Zones customers can change default data location of users, which in turn requires us to migrate content appropriately.
These customer initiated operations need to be executed as fast as possible to provide a great user experience. Traditionally databases do not have off-the-shelf solutions for performing these kinds of iterations. Therefore we came up with our own. In this article we will discuss the design of a service for doing “High throughput iteration of relational databases”. We will also discuss some of the scale challenges we faced and how we, at Box are addressing them.
As discussed earlier, the overall requirement is to be able to iterate and send all the metadata rows of a given enterprise/user for workloads that perform operations on them. The workloads (like encrypting/migrating content) are customer initiated. Our system design must scale horizontally.
In order to get all the files belonging to an enterprise we have to look up the enterprise row by querying enterprise table, then look up all the users which belong to an enterprise by querying user table, then for each of those users find all the files which belong to a user by querying file table, then for each file find all its versions by querying version table.
Another thing to note is that a user can have millions of files and a file can have thousands of versions and so on. Therefore workers have to iterate in chunks and maintain the current state of iteration.
Generalizable Pattern: We identified that there was a repeatable pattern in what we were trying to do. The pattern is that given an entity X get all its children, by querying the corresponding table. Such that children are usually the primary key of the table and parent is a foreign key. Since these children have to be iterated in chunks, we iterate them in order and keep track of the last object we iterated. We call that as a marker.
Here we depict the pipeline which extends this common pattern to support the following scenarios.
- Iterate objects (files/versions) of a range of enterprises
- Iterate objects of an enterprise
- Iterate objects of a user
Now let’s add infrastructure pieces to this generalizable pattern we discussed earlier. As an example we will elaborate Step B of the above diagram.
As shown in following diagram, we have a stateless worker which reads messages from an input queue and after processing, outputs messages into an output queue. The worker queries database to retrieve the required set of rows.
Storing worker context (messages) in a distributed queue and making workers stateless allows us to scale workers and also make them robust of intermittent failures, as MQS provides retries. Each message represents a work which needs to be done and the queue will hold multiple such messages at a time, which allows us to parallelize execution of work and help us achieve high throughput.
Here is the overall design, which is the extension of the repeatable pattern we discussed in the last section.
Key Architectural Components
- MQS (Message Queue Service): Is a Box internal service. It is a horizontally scalable, low latency, high throughput, resource isolated, at least once executed, work stealing queuing system. It exposes a queue API to enable asynchronous message delivery.
- Credence: Data access layer (Box internal) that provides a uniform, language-independent way to interact with relational data at Box.
It is a pipeline of workers which are interconnected by distributed queue’s. It interacts with following components:
- Credence (Data access layer): To query databases to retrieve required sets of rows.
- Redis: We will discuss its use case in subsequent sections.
- MQS: Distributed queue platform where our worker queues are hosted.
Here is what each of the worker does.
Enterprise worker: Given a range of enterprises. Gets all the valid enterprises.
User worker: Given an enterprise. Gets all the valid users.
File worker: Given a user. Gets all its files.
File version worker: Given a file. Gets all its file versions.
Each section of the pipeline does its part of the overall job. For example if we have to iterate all the files of an enterpriseId as EID₁, we will submit a message to Q1 to iterate all enterprises between (EID₁, EID₁+1). Enterprise worker will get that there is an enterprise EID₁ in that range and will submit a message to Q2. User worker will read that message and get all users in that enterprise to submit a message for every user in Q3. File worker will read messages from Q3 and will get all the files of that given user and will submit a message for every file into Q4. File version worker will read messages from Q4 and will get file versions for that file. The output from File/File-version worker constitutes all the objects and this output can be fed to next worker, which intend to perform some operations on these objects.
The Service has a REST interface which accepts requests from clients to schedule an iteration by posting appropriate messages into the corresponding queue.
It is currently deployed in production in a K8s cluster with a fleet size of 36 pods (each pod has 2 cores and 4GB memory). The service is horizontally scalable and we are able to achieve an iteration rate of > 100K objects per second.
We encountered certain challenges once we started running it in production. This section is about how we solved each one of them.
It is important to have rate limiting on the worker to safeguard downstream components from getting overloaded. We used guava rate limiter library, which works great on a single instance/pod. However we wanted to specify a rate limiter for the overall service (cluster of pods). We came up with a solution by specifying the rate in the K8s deployment configuration. Since we know the number of pods in deployment config, it calculates the per pod rate limit and passes that on to the pod as an env variable.
Since all pods have same cpu/memory therefore, per pod rate = overall allowed rate / count of pods.
The worker initializes the guava rate limiter by reading the env variable. This simple technique worked really well for us as we didn’t have a use case to dynamically change the rate and therefore end up avoiding talking to a separate rate limiter service.
At Box we have a wide variety of users. There are users with just a handful of files and then there are users who have hundreds of millions of files. Although the service has the ability to parallelize iteration of users within an enterprise. However it iterates files of a user sequentially. Hence large users result in a long tail of overall iteration completion time. We identify a user as a large user if it has more than 250K files.
The solution was to identify if a user is a large user and then pre split its iteration into smaller ranges.
Let’s say any user with more than 250K files is a large user. To identify a large user we didn’t want to directly query the DB for the file count of a user, as it is a resource expensive operation on DB and we didn’t want to do it for all users. Instead we kept track of how many files we have iterated for a user in the message itself. We also introduced startingMarker, and endMarker, to help us keep iteration within a range.
When the worker processes a message M1 and finds that filesIteratedCount >= 250K, it generates an additional message M2 for iterating the next chunk of 250K files and so on message M3 and M4. It does that by querying the fileId of the subsequent 250K (th) file. That new message will iterate through that range. By doing this we split and parallelize iteration of a large user.
SQL query to get subsequent 250K (th) row.
WHERE userId = U1 AND fileId > B’
ORDER BY fileId ASC
The impact of this change can be better understood via this short animation. Here we display how splitting of large users improves the overall iteration completion time. Note that the first row iteration happens in the same way for both cases, as that is when we determine that it is a large user. After, we start splitting and parallelize iteration for each of those ranges. Depending on the size of user we were able to 10-100X the rate of iteration.
The distributed queuing system uses at least once delivery, which means messages can be delivered and consumed several times. That and client side retries can cause duplicate messages. Duplication of messages means duplication of work, and in our case depending on which message is duplicated we might end up repeating a lot of work.
Instead of focusing on doing an exact message deduplication, we focussed on deduplicating the work. We kept track of every message’s progress in Redis. Using the message fields which do not change during the course of iteration, we created a unique signature of a message. In this case, it would be E1-U1-M1, which would be the Key in Redis and the Value would be the marker value i.e M1. Before processing any message the worker checks if the marker in the message is strictly greater than or equal to the corresponding marker value in Redis. If not, the message is a duplicate and would be deleted by the worker. In case of duplicate messages, we only allow the message which is ahead in the iteration.
Here is a sample message and how it is tracked in the Redis.
║ Message ║
║ EnterpriseId: E1 ║
║ UserId: U1 ║
║ TaskId: T1 ║
║ Marker: M1 ║
║ Key ║ Value ║
║ E1-U1-T1 ║ M1 ║
One of the key requirements of workloads which operate at the enterprise/user level is tracking progress as well as completion. This becomes even more challenging because of the asynchronous and unordered message delivery semantics of the distributed queues which we use. Since we operate at high throughput most of the jobs complete in less than an hour. Which makes tracking real time progress non essential. However knowing whether the job has completed or not is still crucial. The file iteration service (producer) is always used in conjunction with another service (consumer). Therefore it is important to know two things: first whether or not the producer has produced all the required entities for a given job, and second whether the consumer has indeed consumed all of them.
This can be classified as a problem to perform near real time pre-aggregation of data. We solved this by using Redis.
Let’s start by focusing on the first part, that is to identify if the producer has produced all entities or not. During iteration we track (in Redis) occurrences of specific events. Here are the events we track while iterating content of an enterprise.
- discovered-a-user (Counter): Invoked for every user found within an enterprise.
- discovered-all-users (Boolean): Invoked when all the users in an enterprise are found.
- user-iteration-completed (Counter): Invoked when all the files in a user are found.
We identify (2) and (3), when the SQL query result contains less rows than specified in its LIMIT clause. When (2) is true and (1) is equal to (3), it indicates that iteration of an enterprise is completed. We track these events per job in Redis and a cron job periodically checks these events and updates the iteration completion status of the job.
Now let’s look at the second part of the problem which is to ensure that consumer has processed every entity received from the producer: We use Redis hashes for this. Producer will insert the object’s unique identifier into the hash and consumer will remove it after processing is done. In the end the content of Redis hash helps us to determine if everything is processed or what is still remaining to process.
We know that traditionally Redis is not used in this use case. However, by making the right set of changes on our end and with the advanced feature set of AWS Elasticache, we were able to use Redis. Here are answers to some of the common questions.
- What if some of the rows get evicted or Redis instance restarts ?
We use Sets and Hashes to store our data. In either case we include a placeholder sub-row which represents the creation time of the row. Please note that Redis eviction happens only at row-level and not at sub-row level. Therefore row eviction will be detected by the use of placeholder sub-row and the job will be retried.
- Wouldn’t tracking every object in hash overrun memory usage ?
Producer and consumer workers are configured such that they operate at the same rate. This makes sure that entries which are added to the hash from the producer get consumed within a few seconds. Therefore the memory footprint of Redis is always within the limits.
We started with a unique challenge of iterating relational database rows and solved it by using some modern day distributed technologies, namely distributed queues and k8s. These technologies helped us to achieve high throughput. However this came at a cost of some of their limitations, like message duplication and unordered messages. We further solved these by using Redis.
This is a prime example of the kind of large scale infrastructure projects the Storage team at Box is working on. If you are interested in similar problems we would love to have you on our team. To learn more please checkout our careers page.