Programming with Apache Twill*, Part II

Terence Yim is a Software Engineer at Cask, responsible for designing and building realtime processing systems on Hadoop/HBase. Prior to Cask, Terence spent over a year at LinkedIn Inc. and seven years at Yahoo!, building high performance large scale distributed systems.

In the Programming with Weave (now Apache Twill), Part I blog post, we introduced the basics of writing a distributed application on Hadoop YARN using Twill. In this post, we are going to highlight some of the important features in Twill.

Resources specification

There are situations where you will need more than one instance of your application. For example, you are using Twill to run a cluster of Jetty Web Servers. Moreover, different applications would have different requirements on system resources, such as CPU and memory. By default, Twill starts one container per TwillRunnable with 1 virtual core and 512MB of memory. You could, however, customize it when starting your application through TwillRunner. For example, you can specify 5 instances, each with 2 virtual cores and 1GB of memory by doing this:

TwillRunner twillRunner = ...
twillRunner.prepare(new JettyServerTwillRunnable(), 
                    ResourceSpecification.Builder.with()
                                         .setVirtualCores(2)
                                         .setMemory(1, SizeUnit.GIGA)
                                         .setInstances(5)
                                         .build())
           .start();

Notice that this specifies virtual cores and not actual CPU cores. The mapping is defined in the YARN configuration and the allowable virtual core values are governed by yarn.scheduler.minimum-allocation-vcores and yarn.scheduler.maximum-allocation-vcores.

(Read this post to learn how to enable virtual core support in YARN.)

Multiple runnables

Just like you can have multiple threads doing different things, you can have multiple TwillRunnable in your application. All you need to do is implement the TwillApplication interface and specify the runnables that constitute your application. Say your application contains a Jetty server and a log processing daemon, your TwillApplication will look something like this:

public class MyTwillApplication implements TwillApplication {

  @Override
  public TwillSpecification configure() {
    return TwillSpecification.Builder.with()
      .setName("MyApplication")
      .withRunnable()
        .add("jetty", new JettyServerTwillRunnable()).noLocalFiles()
        .add("logdaemon", new LogProcessorTwillRunnable()).noLocalFiles()
      .anyOrder()
      .build();
  }    
}

Notice that the call to anyOrder() specifies that every TwillRunnable in this application can be started in no particular order. If there are dependencies between runnables, you can specify the ordering like this:

// To have Log processing daemon starts before Jetty server
.withRunnable()
  .add("jetty", new JettyServerTwillRunnable()).noLocalFiles()
  .add("logdaemon", new LogProcessorTwillRunnable()).noLocalFiles()
.withOrder()
  .begin("logdaemon")
  .nextWhenStarted("jetty")

File localization

One nice feature in YARN is that it can copy HDFS files to a container’s working directory on local disk, which is an efficient way to distribute files needed by containers across the cluster. Here is an example of how to do so in Twill:

.withRunnable()
  .add("jetty", new JettyServerTwillRunnable())
    .withLocalFiles()
      // Distribute local file "index.html" to container running the Jetty server
      .add("index.html", new File("index.html"))  

      // Distribute and expand contents in local archive "images.tgz" 
      // to the container "images" directory
      .add("images", new File("images.tgz"), true)

      // Distribute HDFS file "site-script.js" to container file named "script.js".
      // "fs" is the Hadoop FileSystem object
      .add("script.js", fs.resolvePath(new Path("site-script.js")).toUri())
    .apply()

In Twill, the file that needs to be localized doesn’t need to be on HDFS. It can come from a local file, or even an external URL. Twill also supports archive auto-expansion and file rename. If no file needs to be localized, simply call noLocalFile() when adding the TwillRunnable.

Arguments

Just like a standalone application, you may want to pass arguments to alter the behavior of your application. In Twill, you can pass arguments to the individual TwillRunnable as well as to the whole TwillApplication. Arguments are passed when launching the application through TwillRunner:

TwillRunner twillRunner = ...
twillRunner.prepare(new MyTwillApplication())
           // Application arguments will be visible to all runnables
           .withApplicationArguments("--debug")

           // Arguments only visible to instance of a given runnable.
           .withArguments("jetty", "--threads", "100")
           .withArguments("logdaemon", "--retain-logs", "5")

The arguments can be accessed using the TwillContext object in TwillRunnable. Application arguments are retrieved by calling TwillContext.getApplicationArguments(), while runnable arguments are available through the TwillContext.getArguments() call.

Service discovery

When launching your application in YARN, you don’t know where your containers will be running, and the hosts can change over time due to container or machine failure. Twill has built-in service discovery support – you can announce a named service from runnables and later on discover their locations. For example, you can start the Jetty server instances on a random port and announce the address and port of the service.

class JettyServerTwillRunnable extends AbstractTwillRunnable() {
  @Override
  public void initialize(TwillContext context) {
    // Starts Jetty on random port
    int port = startJetty();        
    context.announce("jetty", port);
  }
}

You can then build a router layer to route those requests to the cluster. The router will look something like this:

TwillController controller = ...
ServiceDiscovered jettyService = controller.discoverService("jetty");   

// The ServiceDiscovered maintains a live list of service endpoints.
// Everytime the .iterator() is invoked it gives the latest list of endpoints.
Iterator<Discoverable> itor = jettyService.iterator();
// Pick an endpoint from the list of endpoints.
// ...

Controlling live applications

As mentioned in Programming with Weave, Part I, you can control a running application using TwillController. You can change the number of instances of a runnable by simply doing this:

TwillController controller = ...
ListenableFuture<Integer> changeComplete = controller.changeInstances("jetty", 10);

You can then either block until the change is completed or observe the completion asynchronously by listening on the future.

Conclusion

We hope you enjoyed the deep dive into some of the capabilities of Twill. In the next post, we will use an application to illustrate the features highlighted in this post.

*Apache Twill is currently undergoing incubation at the Apache Software Foundation. Help us make it better by becoming a contributor.

Leave a comment