Multiple Outputs in CDAP

Ali Anwar

Ali Anwar is a software engineer at Cask, where he is building on the Cask Data Application Platform (CDAP). Prior to Cask, Ali attained his undergraduate degree in Computer Science from the University of California, Berkeley.

Ali Anwar

In CDAP, a MapReduce program can interact with a CDAP dataset by using it as an input or an output. Before CDAP 3.2.0 users could only have single dataset as the output of a MapReduce job. We wanted to extend this capability and allow writing to multiple datasets from MapReduce jobs to support the following use cases:

  1. Providing capabilities to quarantine invalid records. A user might have a MapReduce job that parses the input records. If the parsing fails, the user might want to store the records that failed into a quarantine dataset and then process the successfully parsed records into a different dataset. An example application that demonstrates this use case is the DataCleansing example.
  2. Abilities to support writing to multiple destinations in Cask Hydrator batch pipeline. For instance users might want to copy the contents of a Database into Cassandra for serving and S3 for backup.

We added that support with CDAP 3.2.0, this blog post will cover the details behind how the support to multiple datasets were added.

Issues with MapReduce MultipleOutputs

The most logical choice for the implementation would be to use MapReduce’s MultipleOutput capability. In Hadoop MapReduce, an OutputFormat defines the implementation of where and how the output of the MapReduce job will be written. It does this by returning a RecordWriter, which has the responsibility of writing each key-value pair to the location specified and prepared by the OutputFormat. In Hadoop code, there exists a class for the purpose of writing to more than one output named MultipleOutputs, though it has multiple drawbacks:

  1. It only works with file-based OutputFormats, which was too restrictive for our use case to support multiple destinations; for instance, Cassandra and Datasets.
  2. Secondly, the MultipleOutputs class in Hadoop works by instantiating a separate RecordWriter for each of the outputs the Mapper/Reducer task writes to.

For instance, if there are three OutputFormats configured, it will instantiate three RecordWriters – one from each of the configured OutputFormats. However, what it neglects to do is delegate the other operations of an OutputFormat to the three configured OutputFormats. At the end of the job, the MapReduce framework will call the OutputFormat’s commitJob method. For FileOutputFormat, this means moving the temporary output to a final location. When using the MultipleOutputs class, it will not call commitJob on each of the delegate OutputFormats, leading to the files being left exactly where the RecordWriters wrote them, which is generally to some temporary directory, as opposed to a configured final directory.

These issues made us look for alternatives and we worked around them by implementing an OutputFormat and OutputCommitter which delegate each of their calls to the multiple configured outputs, and by generalizing the instantiation of multiple RecordWriters to support even non-file based OutputFormats.

The delegating code is actually quite straightforward. It consists of an OutputFormat which overrides all of the methods of the OutputFormat class, that forward the operations to each of its delegates. Below is a code snippet from MultipleOutputsCommitter.java, which calls commitTask on each of the committers it holds a reference to.

@Override
public void commitTask(TaskAttemptContext taskContext) throws IOException {
  for (Map.Entry<String, OutputCommitter> committer : committers.entrySet()) {
    TaskAttemptContext namedTaskContext = MultipleOutputs.getNamedTaskContext(taskContext, committer.getKey());
    if (committer.getValue().needsTaskCommit(namedTaskContext)) {
      committer.getValue().commitTask(namedTaskContext);
    }
  }
}

The OutputFormat which binds this all together, and that is set on the actual Hadoop job, is MultipleOutputsMainOutputWrapper.java. It is responsible for returning a single OutputCommitter, and so it determines all of the named outputs as well as each output’s individual configuration. It then uses these values to create an OutputCommitter for each output and return an instance of MultipleOutputsCommitter, which was referenced above.

@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
  // return a MultipleOutputsCommitter that commits for the root output format as well as all delegate outputformats
  if (committer == null) {
    Map<String, OutputCommitter> committers = new HashMap<>();
    for (String name : MultipleOutputs.getNamedOutputsList(context)) {
      Class<? extends OutputFormat> namedOutputFormatClass =
        MultipleOutputs.getNamedOutputFormatClass(context, name);

      TaskAttemptContext namedContext = MultipleOutputs.getNamedTaskContext(context, name);

      OutputFormat outputFormat = new InstantiatorFactory(false).get(TypeToken.of(namedOutputFormatClass)).create();
      committers.put(name, outputFormat.getOutputCommitter(namedContext));
    }
    committer = new MultipleOutputsCommitter(committers);
  }

  return committer;
}

Multiple Output in Action

In Cask’s ETL framework a.k.a, Cask Hydrator, a user can define a source (to read the data), one or more transformations (to process the data), and one or more sinks (that serves as destination for processed records). When a user uses the ETL Batch pipeline, it is executed as a MapReduce job. Having the data be written to multiple sinks is implemented by each of these sinks corresponding an OutputFormat, and then configuring the MapReduce job to add each of these OutputFormats as output. An example of such an ETL (pictured below) is sending the events of a stream, filtering these events using Javascript, and sending the data to a database, a table, and to a S3 in Parquet format.

image00

Summary

In this blog post, we saw the limitations in Hadoop’s Multiple output and how we overcame those to implement the use cases in CDAP. With the newly built capabilities in CDAP users can now use MapReduce programs in CDAP to write to multiple outputs whether the output is a Parquet file in S3 or a Table dataset. You can also configure an ETL Pipeline with multiple sinks. Download the recently released CDAP 3.2.0 SDK and give it a try!

  • Ramnath

    Hi Ali, Is there a way that we can modify the cdap-etl-batch code that connects to Redshift and writes data into a S3TextBatchSink or S3CSVBatchSink instead of S3ParquetBatchSink or S3AvroBatchSink?

    • Ali Anwar

      Hey Ramnath.
      It would be straightforward to implement an S3TextBatchSink or S3CSVBatchSink plugin as a sink to the pipeline.
      My understanding is that you want to use Redshift as the source. A source plugin would be needed to be implemented for that to be used as the source, since there doesn’t currently exist one.
      Take a look at some example plugins for how to write a source/sink:
      https://github.com/caskdata/hydrator-plugins

      • Ramnath

        Hey Ali,

        Thank you for the quick reply. In order to implement this application, I took a look at the
        Batch Database Table To CDAP HBase Table Application Configuration under cdap-etl-adapter-guide.

        Hence, I created to json files namely config.json and redshift-connector-jdbc-4.1.json. Also, I wrote a Java class S3TExtBatchSink.java that extends S3BatchSink. I compiled this into a cdap-etl-lib.jar and replaced it with the cdap-etl-lib-3.2.1-batch.jar in the artifacts folder under the sdk.

        When I was trying to create an app, I was getting the below error:

        Error: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: No Plugin of type ‘sink’ named ‘S3Text’ was found. Please check that an artifact containing the plugin exists, and that it extends the etl application.

        Let me know if the process I am following here is wrong.

<< Return to Cask Blog