HawtDispatch

HawtDispatch: The libdispatch style API for Java and Scala

HawtDispatch is a small ( less than 100k ) thread pooling and NIO handling framework API modeled after the libdispatch API that Apple created to power the Grand Central Dispatch (GCD) technology in OS X. It allows you to more easily develop multi-threaded applications which can more easily scale to take advantage of all the processing cores on your machine. At the same time, it’s development model simplifies solving many of the problems that plague multi-threaded NIO development.

Features

  • Java 1.5 API
  • Scala 2.8 API
  • Thread Pooling
  • Delayed Task Execution
  • Priority Task Execution
  • NIO Handling

Usage Guide

The DispatchQueue

The most important objects in the HawtDispatch API, are the DispatchQueue objects. They are Executor objects which will execute submitted runnable objects at a later time. They come in 2 flavors:

Handy imports

The examples in this document assume that you have added the following imports:

In Java

import org.fusesource.hawtdispatch.*;
import static org.fusesource.hawtdispatch.Dispatch.*;

In Scala

import _root_.org.fusesource.hawtdispatch._;
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._;

Submitting Runnable Objects

Once you have a reference to a queue object you can use it to perform some asynchronous processing. The Scala queue object is enriched with several helpers to make enqueuing async tasks easier. Example:

In Java

queue.execute(new Runnable(){
  public void run() {
    System.out.println("Hi!");
  }
});

in Scala

queue {
  System.out.println("Hi!");
}
// or
queue << ^{
  System.out.println("Hi!");
}
// or
^{
  System.out.println("Hi!");
} >>: queue

The ^{ .... } block syntax in the Scala example is browed from the GCD. It produces a regular Java Runnable object.

Dispatch Sources

A Dispatch Source is used trigger the execution of task on a queue based on an external event. They are usually used to integrate with external IO events from NIO, but you can also use a custom Dispatch Source to coalesce multiple application generated events into a single event which triggers an async task.

Dispatch sources are initially created in a suspended state. Once its’ created and you have configured it’s event handler, you should call the DispatchSource.resume method so that it is executed on the specified queue. If you later want to stop processing events for a period of time, call the DispatchSource.suspend method.

NIO Dispatch Source

NIO integration is accomplished via a DispatchSource object which is created using the Dispatch.createSource method. You supply it the SelectableChannel and the operations your interested in receiving events for like OP_READ or OP_WRITE and when it’s that NIO event is raised, it will execute a runnable callback you configure on a dispatch queue you specify. HawtDispatch takes care of setting up and managing the NIO selectors and selector keys for you.

Example:

In Java

SelectableChannel channel = ...
DispatchQueue queue = createQueue()
DispatchSource source = createSource(channel, OP_READ, queue);
source.setEventHandler(new Runnable(){
  public void run() {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int count;
    while( (c=channel.read(buffer)) > 0 ) {
      // just dump it to the console
      System.out.write(buffer.array(), buffer.offset(), buffer.position());
    }
  }
});
source.resume();

In Scala

val channel:SelectableChannel = ...
val queue = createQueue
val source = createSource(channel, OP_READ, queue)
source.setEventHandler(^{
  val buffer = ByteBuffer.allocate(1024)
  var count=0
  while( (c=channel.read(buffer)) > 0 ) {
    // just dump it to the console
    System.out.write(buffer.array(), buffer.offset(), buffer.position());
  }
});
source.resume

Custom Dispatch Source

A Custom Dispatch Source is used to coalesce multiple application generated events into a single event which triggers an async task. By using a Custom Dispatch Source you reduce the amount of cross thread contention since multiple events generated by one thread are passed to the processing thread as a single batch.

When you create a custom dispatch source, you provide it an aggregator which controls how events are coalesced. The supplied aggregators are:

Event producers can call the merge(event) method on the custom dispatch source to supply it data. Calling the merge method will cause the event handler Runnable configured on the dispatch source to be executed. When it is executed, it you should use the custom dispatch source getData() method to access the merged event. The getData() should only be called from the configured event handler.

In Java

final Semaphore done = new Semaphore(1-(1000*1000));

DispatchQueue queue = createQueue();
final CustomDispatchSource<Integer, Integer> source = createSource(EventAggregators.INTEGER_ADD, queue);
source.setEventHandler(new Runnable() {
  public void run() {
    int count = source.getData();
    System.out.println("got: " + count);
    done.release(count);
  }
});
source.resume();

// Produce 1,000,000 concurrent merge events
for (int i = 0; i < 1000; i++) {
  getGlobalQueue().execute(new Runnable() {
    public void run() {
      for (int j = 0; j < 1000; j++) {
        source.merge(1);
      }
    }
  });
}

// Wait for all the event to arrive.
done.acquire();

In Scala

val done = new Semaphore(1 - (1000 * 1000))

val queue = createQueue()
val source = createSource(EventAggregators.INTEGER_ADD, queue)
source.setEventHandler(^{
  val count = source.getData()
  println("got: " + count)
  done.release(count.intValue)
});
source.resume();

// Produce 1,000,000 concurrent merge events
for (i <- 0 until 1000) {
  globalQueue {
    for (j <- 0 until 1000) {
      source.merge(1)
    }
  }
}

// Wait for all the event to arrive.
done.acquire()

On an 8 core machine you would see output similar to:

got: 167000
got: 103000
got: 103000
got: 163000
got: 119000
got: 109000
got: 111000
got: 125000

Restrictions on Executed Runnables

All runnable actions executed asynchronously by HawtDispatch should be non-blocking and avoid waiting on any synchronized objects. If a blocking call has to performed, it should be done asynchronously in a new thread not managed by HawtDispatch.

Common Patterns

Protecting Mutable State

A common pattern that shows up to use a serial queue to synchronize access to the mutable state of an object. Example:

  class MyCounter {
    val queue = createQueue()
    var counter = 0;
    
    def add(value:Int) = ^{
      counter += value
    } >>: queue
    
    def sub(value:Int) = ^{
      counter -= value
    } >>: queue
  }

Asynchronous Cleanup

On many occasions there is a resource associated with concurrent processing, and it needs to be released/cleaned up once the concurrent processing has completed.

This can be easily done by configuring a disposer callback on the dispatch queue or dispatch source. These object support being reference counted using the retain and release method calls. They are initially created with a retain count of 1.
Once the retain count reaches zero, the disposer runnable is executed on the associated queue.

The following example tries to simulate a case were multiple concurrent tasks are using a shared resource and once they finish executing that shared resource gets closed.

val stream:PrintStream = ...
val queue = createQueue()
queue.setDisposer(^{ stream.close })

for( i <- 1 to 10 ) {
  queue.retain
  getGlobalQueue << ^ {
    // Concurrently compute some values and send then to 
    // the stream.
    val value = "Hello "+i
    queue << ^{ stream.println(value) }
    // the stream is closed once the last release executes.
    queue.release
  }
}
queue.release

Its’ also important to note that the enqueued runnable objects increment the retain counter. The following version of the above example also only closes the stream after all the values are sent to the stream:

val stream:PrintStream = ...
val queue = createQueue()
queue.setDisposer(^{ stream.close })
for( i <- 1 to 10 ) {
  val value = "Hello "+i
  queue << ^{ stream.println(value) }
}
queue.release

References