Scalable Distributed Transactional Queues on HBase

Terence Yim is a Software Engineer at Cask, responsible for designing and building realtime processing systems on Hadoop/HBase. Prior to Cask, Terence worked at both LinkedIn and Yahoo!, building high performance large scale distributed systems.

A real time stream processing framework usually involves two fundamental constructs: processors and queues. A processor reads events from a queue, executes user code to process them, and optionally writing events to another queue for additional downstream processors to consume. Queues are provided and managed by the framework. Queues transfer data and act as a buffer between processors, so that the processors can operate and scale independently. For example, a web server access log analytics application can look like this:

 

queue_perf

 

One key differentiator among various frameworks is the queue semantics, which commonly varies along these lines:

  • Delivery Guarantee: At least once, at most once, exactly-once.
  • Fault Tolerance: Failures are transparent to user and automatic recovery.
  • Durability: Data can survive failures and restarts.
  • Scalability: Characteristics and limitations when adding more producers/consumers.
  • Performance: Throughput and latency for queue operations.

In the open-source Cask Data Application Platform (CDAP), we wanted to provide a real-time stream processing framework that is dynamically scalable, strongly consistent and with an exactly-once delivery guarantee. With such strong guarantees, developers are free to perform any form of data manipulation without worrying about inconsistency, potential reprocessing or failure. It helps developers build their big data application even if they do not have strong distributed systems background. Moreover, it is possible to relax these strong guarantees to trade-off for higher performance if needed; it is always easier than doing it the other way around.

Queue Scalability

The basic operations that can be performed on a queue are enqueue and dequeue. Producers write data to the head of the queue (enqueue) and consumers read data from the tail of the queue (dequeue). We say a queue is scalable when you enqueue faster as a whole by adding more producers and dequeue faster as a whole by adding more consumers. Ideally, the scaling is linear, meaning doubling the amount of producers/consumers will double the rate of enqueue/dequeue and is only bounded by the size of the cluster. In order to support linear scalability for producers, the queue needs to be backed by a storage system that scales linearly with the number of concurrent writers. For consumers to be linearly scalable, the queue can be partitioned such that each consumer only processes a subset of queue data.

Another aspect of queue scalability is that it should scale horizontally. This means the upper bound of the queue performance can be increased by adding more nodes to the cluster. It is important because it makes sure that the queue can keep working regardless of cluster size and can keep up with the growth in data volume.

Partitioned HBase Queue

We chose Apache HBase as the storage layer for the queue. It is designed and optimized for strong row-level consistency and horizontal scalability. It provides very good concurrent write performance and its support of ordered scans fits well for a partitioned consumer. We use the HBase Coprocessors for efficient scan filtering and queue cleanup. In order to have the exactly-once semantics on the queue, we use Tephra’s transaction support for HBase.

Producers and consumers operate independently. Each producer enqueues by performing batch HBase Puts and each consumer dequeues by performing HBase Scans. There is no link between the number of producers and consumers and they can scale separately.

The queue has a notion of consumer groups. A consumer group is a collection of consumers partitioned by the same key such that each event published to the queue is consumed by exactly one consumer within the group. The use of consumer groups allows you to partition the same queue with different keys and to scale independently based on the operational characteristics of the data. Using the access log analytics example above, the producer and consumer groups might look like this:

 

queue_groups

 

There are two producers running for the Log Parser and they are writing to the queue concurrently. On the consuming side, there are two consumer groups. The Unique User Counter group has two consumers, using UserID as the partitioning key and the Page View Counter group contains three consumers, using PageID as the partitioning key.

Queue rowkey format

Since an event emitted by a producer can be consumed by one or more consumer groups, we write the event to one or more rows in an HBase table, with one row designated for each consumer group. The event payload and metadata are stored in separate columns, while the row key follows this format:

 

queue_row_key

 

The two interesting parts of the row key are the Partition ID and the Entry ID. The Partition ID determines the row key prefix for a given consumer. This allows consumer to read only the data it needs to process using a prefix scan on the table during dequeue. The Partition ID consists of two parts: a Consumer Group ID and an Consumer ID. The producer computes one partition ID per consumer group and writes to those rows on enqueue.

The Entry ID in the row key contains the transaction information. It consists of the producer transaction write pointer issued by Tephra and a monotonic increasing counter. The counter is generated locally by the producer and is needed to make row key unique for the event since a producer can enqueue more than one event within the same transaction.

On dequeue, the consumer will use the transaction writer pointer to determine if that queue entry has been committed and hence can be consumed. The row key is always unique because of the inclusion of a transaction write pointer and counter. This makes producers operate independently and never have write conflicts.

In order to generate the Partition ID, a producer needs to know the size and the partitioning key of each consumer group. The consumer groups information is recorded transactionally when the application starts as well as when there are any changes in group size.

Changing producers and consumers

It is straightforward to increase or decrease producers since each producer operates independently. Adding or removing producer processes will do the job. However, when the size of consumer group needs to change, coordination is needed to update the consumer group information correctly. The steps can be summarized by this diagram:

 

queue_scale

 

 

Pausing and resuming are fast operations as they are coordinated using Apache ZooKeeper and executed in parallel. For example, with the web access log analytics application we mentioned above, changing the consumer groups information may look something like this:

 

queue_state

 

With this queue design, the enqueue and dequeue performance is on par with batch HBase Puts and HBase Scans respectively, with some overhead for talking to the Tephra server. That overhead can be greatly reduced by batching multiple events in the same transaction.

Finally, to prevent “hotspotting“, we pre-split the HBase table based on the cluster size and apply salting on the row key to better distribute writes which otherwise would have been sequential due to monotonically increasing transaction writepointer.

Performance Numbers

We’ve tested the performance on a small ten-node HBase cluster and the result is impressive. Using a 1K bytes payload with batch size of 500 events, we achieved a throughput of 100K events per second produced and consumed, running with three producers and ten consumers. We also observed the throughput increases linearly when we add more producers and consumers: for example, it increased to 200K events per second when we doubled the number of producers and consumers.

With the help of HBase and a combination of best practices, we successfully built a linearly scalable, distributed transactional queue system and used it to provide a real time stream processing framework in CDAP: dynamically scalable, strongly consistent, and with an exactly-once delivery guarantee.

 

  • twiptbb

    Would it be possible to construct a consolidation consumer — say a “Consumer A” that could dequeue across both Consumer 1 and Consumer 2 ?

    • Terence Yim

      Does “Consumer 1” and “Consumer 2” reading from the same queue? It’s always possible to create a higher level consumer that reads from the same queue with different partition IDs and/or with different consumer groups.

  • manasvigupta

    Just curious – Any particular reason for not choosing Apache Kafka as distributed queue ?

    From my limited knowledge, this setup seems to be quite close to Apache kafka (including use of consumer groups, creating partitions by ids etc). Are there any obvious differences that I am missing?

    Thanks in advance.

    • Terence Yim

      We didn’t pick Kafka because we wanted to support “exactly-once” delivery and Kafka only has “at-least-once”. It is possible to use Kafka as the queue data store while maintaining the transactional queue states outside of Kafka to achieve exactly-once delivery. However, this complicates the enqueue logic and also creates overhead when dequeuing since it needs to fetch and check across two systems.

      We also wanted both the enqueue and dequeue to operate within the same transaction as the data processing. Think of it as “read->process->write” in an all or nothing manner. This makes development of the processing pipeline easier, as the developer doesn’t need to worry about handling failure or reprocessing, and it is not limited to performing idempotent data operations only.

      • alexbaranau

        > It is possible to use Kafka as the queue data store while maintaining the transactional queue states outside of Kafka to achieve exactly-once delivery.

        In case of Kafka being a source for realtime processing, this is how CDAP makes consuming from Kafka transactional, i.e. with exactly once guarantee.

        • Terence Yim

          Yes, although the writing to Kafka is non-transactional and usually external to CDAP.

  • Adrian Muraru

    Terence, off-topic question, what tool you used to draw the diagrams? Look pretty cool. Thanks

    • Terence Yim

      I used a tool call SimpleDiagrams to draw them.

  • Renato Marroquín

    Hi Terence, cool blog post!

    I have some doubts about it though.

    1) Could you give us a tangible example of what a partition Id would look like? Because you say
    “..to generate the Partition ID, a producer needs to know the size and the partitioning key of each consumer group” But in the figure the PartitionID = ConsumerGroup+Consumer

    2) When talking about changing consumers/producers, you mention “instances”, what are these instances? do you mean consumers?
    3) What is the mapping between tables and queues? Is this a 1-to-1 mapping? Could you explain a little bit about this please?
    Many thanks Terence!

    • Terence Yim

      Hi Renato,

      Thanks for your interest in this blog post. Please find my answer below.

      1. One queue can have multiple consumer groups. Within each group, there can be multiple consumers. To uniquely identify a consumer of a queue, we need both the consumer group ID and the consumer ID (within that group), which we call it “Partition ID” from the producer perspective. From the example in the post, “Unique User Counter” is a consumer group. Lets say the consumer group ID of it is “16” (Hex is 0x10). If within that group, there are 3 consumers, then they will have partition IDs as “0x10 00000000”, “0x10 00000001” and “0x10 00000002” respectively.

      2. Yes, instances mean consumers in that case. Sorry for the confusion.

      3. An HBase table can have one or more queues. Each queue entry row in the table is prefixed with the queue name. Inside the CDAP real time processing engine, we use one table per processing DAG, which can have multiple queues inside (basically there is one queue per edge in the DAG). You can learn more about it in http://tinyurl.com/p8o3dwx

      • Renato Marroquín

        Hi Terence,

        Thank you very much for your responses!
        1. Great! I get it now, and what about data partitioning? e.g. partitioning by usedId or by webUrl? I guess that is dealt directly on the processors, but the queues don’t rely on this data partitioning right?
        I think the other ones are clear now, and it makes sense having the queues being the edges of the graph as they are the results from each computation or nodes in the DAG (is this the case?) Thanks again Terence!

        • Terence Yim

          Hi Renato,

          The producer computes a hash value based on the key provided by the caller (the one who tries to enqueue). It then computes one target partition ID per consumer group by “consumer group ID” + (hash value % consumer group size).

  • suds

    can you share hbase configurations used during this test?

    • Terence Yim

      The HBase cluster we used for the perf test was running with 9 RegionServers. We didn’t change much from the default config, except the following parameters.

      hbase.hregion.memstore.flush.size = 536870912
      hbase.hstore.blockingStoreFiles = 32
      hbase.regionserver.global.memstore.upperLimit = 0.5
      hbase.regionserver.handler.count = 100
      hfile.block.cache.size = 0.3

  • santhosh kumar

    Thaks Yim for this post. I have started HBase recently and learning, please correct me if my understanding on this design is wrong. On high level note..when you say producers and consumers, are clients which writes and reads data from HBase. It will be like [Prod] —> [HBase] —> [Cons]. Consumers consume data by identifying the ‘row_key’ format.

    • Terence Yim

      The terms producer and consumer are related to the queue operations. Producers are the one who enqueue (put stuff to the queue), while consumers are the one who dequeue (take stuff from the queue). From the HBase operation point of view, producer performs only write operations, while consumer performs both read and write operations (reading the data and preserving its own states).

      • santhosh kumar

        From HBase point of view, when you say producer, it writes data into HBase tables and consumers reads these data and updates the states from the same tables. so my question is these producer and consumer are process in your application which manages HBase tables? sorry for asking again!!

        • Terence Yim

          Yes. The producer/consumer are the internal library in CDAP for providing the transactional queue system.

<< Return to Cask Blog