An in-memory data warehouse with adaptive replication
Introduction
Cloud-based data warehouses nowadays have become the de facto standard due to the core benefits associated with the Cloud, such as on-demand computing, limitless storage, increased computational capacity, as well as the ability to scale in and out, paying only for the resources used. A traditional Cloud data warehouse architecture can follow the ‘Primary-Replica’ approach, similar to what the figure below depicts.
These warehouses are distributed systems in principle but do not follow any distributed data processing scheme. Each node has its own resources and a Relational Database Management System (RDBMS) as the main query processing engine that handles part of the incoming workload.
The replicas in this setup serve as the main mechanism to cope with the growing workload, improve the performance issues that will derive from the massive number of concurrent users, and provide reliability as a fail-over mechanism of the system. An incoming query that reads data can be processed by any replica, however, update queries can be handled only by one of them.
For the replicas to remain fresh, they are fed with transaction log records, and the user sees a ‘near’ real-time latest version of the original database. The workload is spread among these replicas based on a load-balancing algorithm, and a caching mechanism is introduced so that some of the result sets can be served from the main memory of each replica. This increases the query performance by minimizing disk access. When the computing demand grows really high, elasticity kicks in, and automatically, another database server is added to the pool to respond to the demand and removed whenever the demand plunges.
Undoubtedly, the approach taken in the architecture above is a proven approach that works for most data warehousing needs. However, I believe that there are some major downsides.
- Scaling up the data warehouse requires another replica in the pool. Now, considering the noteworthy Cloud costs… “Is the performance contribution of another replica (after a certain point) really worth it? Is the tradeoff between the cost and the performance gains worth the effort?”
- The database replicas are naively created as exact copies of the original database. However, not every replica needs to be a complete copy. Most of the time, users query only a small portion of the whole dataset. If we generalize the ‘Pareto principle,’ it is not unreasonable to assume that “Most of the time, 80% of the queries can be answered with 20% of the data.”
Based on the questions above, I tried to envision a system that can potentially tackle the aforementioned while being highly inspired by the Primary-Replica architecture above. As a result, I came up with designing and implementing Hermes. This is a lightweight software layer on top of an existing OLAP-focused RDBMS that manages a cluster of in-memory database replicas and orchestrates query processing over them.
In Hermes, the approach of creating replicas goes beyond the current state-of-the-art by recognizing that not all replicas are equal. They differ in the database portion replicated and in their freshness. Instead of naively creating full database replicas, Hermes is based on a partial replication mechanism. The replicas reflect users’ interests in a part of the database and adapt as the users’ interests shift to undiscovered parts while there is still persistent Cloud storage to prevent potential data loss.
My goal with this article is not to try to promote Hermes as a production-ready system. I am aiming to trigger the discussion on whether or not such an approach could potentially work efficiently in a real environment! So if you are interested, please stay with me and continue reading 🚀.
Overall Architecture
A typical use case for data warehouses is an analytical workload where analysts try to obtain useful insights about an organization. In such cases, the number of read queries is much larger than the database updates. Usually, the queries focus on a limited number of data partitions that reflect their interests. This is exactly the kind of workload we aim for Hermes to handle efficiently.
The major requirement in the design of Hermes is an in-memory column-store database system. Taking advantage of a column store allows our system to focus on analytical workloads since these kinds of systems can do the following:
- aggregate large volumes of data for a subset of columns
- respond expeditiously by effectively using the CPU caches and SIMD instructions
Another important aspect of such systems is that we can leverage their power to realize the adaptive replication mechanism for Hermes. Addressing the data per column is a dominant requirement because the system can identify the columns involved in a query, populate the in-memory databases with those columns, and eventually replicate their content based on the demands of the incoming query workload.
As the figure above illustrates, the architecture of Hermes is logically separated into three layers. The ‘User layer’ incorporates all the users that would like to interact with the system, including the ‘Compute layer’ and the ‘Storage layer.’ The compute layer contains Hermes’s distributed infrastructure capable of handling a bulk workload of queries.
The system’s main components are the ‘Query Dispatcher’ and the ‘Agents.’ The dispatcher serves two requirements of the system. It provides a way for the users to interact with the system, i.e., forward their workload and receive the output, and orchestrates query execution by forwarding each workload query to one of the available agents.
For those reasons, the dispatcher opens a bidirectional communication channel with the agents and the users. The agents that lie below the dispatcher can be arbitrary in number. Each is an in-memory database server, ready to accept a query, perform the execution and return the results.
Query Execution
The key role of the dispatcher is to make the decision regarding which agent is going to forward an incoming query. The execution is deeply bonded with the time and the cost. However, as Benjamin Franklin said, ‘Remember that time is money’ — it is all about the cost at the end.
We can use a cost-offer mechanism to make the dispatcher of Hermes cost-agnostic regarding the state of the system and collaborate perfectly with the agents. Upon receiving a query, the dispatcher extracts all the references made to persistent columns and forwards the information to the agents. The agents check the columns required by the query, reflect on their state, and based on the cost model, they respond with an offer depending on their ability to serve the given query.
Upon receiving the cost offers, the dispatcher decides which one to accept based on the following logic. If all the agents make an offer with the maximum query cost, the dispatcher picks an agent to forward the query that was never picked before, i.e., the agent has no data in memory. If all the agents have data loaded, the dispatcher accepts the offer with the lowest cost and forwards the query for execution to that agent. If there are multiple offers with the same minimum cost, or all the agents are empty, the dispatcher selects one of the agents randomly.
In the described scenario, there are three important design decisions consolidated:
- Hermes uses a cost model to control the price of the query in the Cloud by forwarding the workload to the most capable agent.
- The dispatcher forwards a query to empty agents if certain conditions are met. This is particularly useful to allow different database partitions to end up on different in-memory database servers during the initialization phase.
- The agents are autonomous entities. Each agent keeps track of their own state regarding the columns they have in memory without needing a central component to accommodate that.
Example
We will go through the example illustrated in the figure below to gain further insight into the system’s behavior. The figure demonstrates the system’s initial state, where Hermes is up and running, and the memory of the agents is initially empty.
In step 1, a query that requires reading data arrives at the dispatcher; from that point, the system must handle it. Query execution is a multi-step process in Hermes. Upon the dispatcher receiving the query, it analyzes it and determines the columns involved. Forthright, the dispatcher forwards the query and supplies information to the agents regarding all the persistent columns needed.
In step 2, the agents compute the cost for the given columns and respond with an offer. Since initially, all agents are empty, the offers arriving at the dispatcher have the same cost, and thus, the dispatcher randomly selects ‘Hermes Agent 3’ with cost ‘1’ (<ha3, 1>) to execute the query, accepting its offer in step 3.
In step 4, the selected agent retrieves part of the persistent store. The agent knows that it has to fetch all the required columns to support the query execution and accesses the storage layer. After the data is loaded in memory, the replica is created, and ha3 can execute the query, returning the result set in step 5.
The dispatcher receives the output and, in step 6, forwards it to the user. After that point, for every subsequent query that uses the same columns, ha3 will provide a lower cost offer since it has already cached part of the underlying database. Thus, the result set can be served directly from memory without reaching the storage layer. In this case, the dispatcher, upon receiving the bids, would pick the offer of ha3, given that it has the smallest cost.
There is a bunch of other topics that can be considered in the design phase of Hermes. For example, elasticity, fault tolerance, and data updates. However, to keep this article short, I focused solely on the core architecture. Thanks for reading this article, and I hope you enjoyed it! If you find Hermes an interesting project, please let me know.
I plan to write a follow-up article to dive into the implementation details.
That’s all, folks, for now… Till next time!
Hermes was originally published in Better Programming on Medium, where people are continuing the conversation by highlighting and responding to this story.