Data-driven job scheduling in Hadoop

Julien Guery

Julien Guery was a software engineer at Cask where he built software fueling the next generation of Big Data applications. Prior to Cask, Julien was a graduate student at Telecom Bretagne in France where he studied information technologies.

Julien Guery

Triggering the processing of data in Hadoop—as soon as enough new data is available—helps optimize many incremental data processing use-cases, but is not trivial to implement. The ability to schedule a job (such as MapReduce or Spark) to run as soon as there’s a certain amount of unprocessed data available—for instance, in a set of HDFS files or in an HBase table—has to find a balance between the speed of delivering the results and the resources used to compute those results.

The direct solution—where a scheduler polls the source of data for its state or size at regular intervals—has a number of issues:

  • The scheduler has to know how to deal with different type of sources. Adding a new source type—say, a Hive table—would require updating the scheduler to add support for it.
  • Computing the state of the source can be complex. The source of data can have a complex structure, such as a hierarchy of directories on HDFS with many files that represent partitions of a data set. Data size does not always increase: sometimes it can reduce due to the application of a data retention policy where old files are scrubbed, making keeping track of progress more complicated.
  • Computing the state of the source can be resource intensive. For instance, finding out the size of newly written data in an HBase table may require scanning through the actual data or collecting statistics on the table’s files in HDFS.
  • Infrequent polling for a source state may cause unwanted delays in processing the data. With computing the state being complex and resource intensive, it is common to configure polling at greater intervals. Depending on the nature of the source, this can mean that data is processed with increasingly greater delays, or that many polls are redundant and unnecessarily eat up resources.

In this post, we talk about a solution that addresses some of these pain points.

 

Decoupling of source and scheduler

To simplify the scheduler and allow adding new types of sources without changing a scheduler we want to decouple the two. Moving the tracking of source state into the source itself (or the writer of the data) also helps to do it more efficiently because there’s a lot more insights into the ingested data on the source side. To decouple source and scheduler, we introduce the source state management system:

schedule-post1-state

The source reports its state incrementally, as it writes new data, to a state management system. The scheduler polls for state changes at regular intervals and triggers the job, if needed, based on a schedule configuration. Thus, the scheduler only interacts with the state management system.

 

Optimize the data source querying delay by using notifications

Decoupling sources and the scheduler helps to reduce the dependency between the two, but it still leaves polling issues which can be addressed by introducing notifications. In order for the scheduler to trigger a job immediately—even when data arrives at the source in between two polls—the scheduler must be notified about the changes earlier when there’s a significant state change. For example, every time certain amount of data is written to the source, a notification can be published:

schedule-post1-notify

Upon receiving the notification, the scheduler polls the state of the source right away to confirm that there is enough data to start a job. If there’s enough new data, the scheduler triggers the job.

 

Example: Scheduling a job based on data availability in a CDAP Stream

Let’s take a look at how the open source Cask Data Application Platform (CDAP), running on Hadoop, implements a solution that allows you to schedule a job based on data availability in a Stream. One benefit of CDAP being an integrated platform is that it includes many auxiliary services that help developers to build and run applications and perform system tasks, including scheduling jobs.

schedule-post1-stream

In CDAP, the built-in Metrics service is used to maintain statistics for various resources managed by the platform, while the Notification service helps optimize the polling. A Scheduler uses the two to trigger a data processing job within a Workflow container.

CDAP provides an abstraction for elastically scalable real-time and batch data ingestion, called a Stream. A StreamWriter stores data as files on HDFS that can later be consumed for processing in either realtime or by a batch job. It reports the changes in the statistics of the Stream incrementally by emitting metrics to a metrics system. Once the change reaches a configured threshold, the Stream sends a notification to the notification system that then distributes it to subscribers so they can take action. This allows the scheduler to perform infrequent polling and still be able to quickly act on changes. Notifications are optional: the Metrics system acts as the “source of truth” in the end. This makes both the Notification and Metrics systems simpler and more efficient at what they do.

With the recently released v2.8.0 of CDAP, configuring the job to run when enough new data has been written to a Stream is straightforward:

The code above creates a Stream, adds a Workflow and configures the Workflow to be run when one megabyte of new data has been ingested into the Stream. More information on how to build and run PurchaseApp example can be found here, with its source code available here.

Summary

Scheduling a job in Hadoop based on the availability of data is a very common use case. At the same time, it is not a trivial feature to build and is best handled with an integrated platform. Check out the solution we’ve built—completely open-source—here at Cask. Also, stay tuned for the follow-up posts in which we cover more details about how we built Notification and Metrics services within CDAP.

If you found this post interesting, you may also find it exciting to take part in helping us build CDAP or contribute with feedback.

  • Blake

    I feel like this post is misleading. Streams are a very neat concept in CDAP and the glimpse into the architecture behind them is educational but the original problem posed in the post was not addressed. Streams don’t solve the problem of acting on data in HDFS as soon as it’s available. Streams _happen_ to use HDFS to store their data but that’s the only connection to the original problem. For most scenarios (standard HDFS files, Hive tables), Streams do not solve the issue.

    The issue posed in this post is one I’ve been looking into, specifically how to take data from HDFS and get it into the Realtime processing pipeline (Flows). There are examples for loading local files into that system (Flume, etc) but nothing for HDFS. I’ve yet to find a built-in component in CDAP that bridges that gap, likely due to the difficulties mentioned in this post.

    • Andreas Neumann

      You raise a valid point. Streams are only one type of data that is one HDFS. For now, notifications for data availability are only implemented for Streams. In addition to Streams, CDAP provides Datasets to manage other types of data, such as FileSet and PartitionedFileSet. We are planning to add notifications for these Datasets in the future. That will allow to trigger processing when a new file is available in a FileSet, or a new Partition in a PartitionedFileSet.

      You also mention another pain point in Hadoop: How to process batch data – possibly large files – in the realtime pipeline. Obviously, this is not truly realtime processing any more, because the data itself already does not arrive in realtime. Still, being able to reuse realtime code for batch data is a very appealing proposition.

      Here is an apparently easy solution: Write a flowlet that can read from a file, and use that as the first flowlet, instead of reading from a stream. Every time there is a new file, start the flow, and stop it as soon as it has processed the entire file. There are some complications, though: A new file may arrive before the previous file is processed – should we now run two concurrent instances of the flow? (That would mix up the order of events between the two files). Or should we queue up the new file to be processed as soon as the previous is done? Then we may not be able to keep up with the incoming data. We still do not understand all the possible implications of processing files with flows. But it is definitely a feature that we want to and will implement in the future.

  • Amit Gupta

    Hadoop cluster lab setup on rest for learning purpose, if anyone interested, please look at
    Lab setup

<< Return to Cask Blog