HawtDispatch

The libdispatch style API for Java and Scala

Overview

HawtDispatch is a small ( less than 100k ) thread pooling and NIO event notification framework API modeled after the libdispatch API that Apple created to power the Grand Central Dispatch (GCD) technology in OS X. It allows you to easily develop multi-threaded applications without having to deal with the problems that traditionally plague multi-threaded application development.

Features

The DispatchQueue

The most important objects in the HawtDispatch API, are the DispatchQueue objects. They are Executor objects which will execute submitted runnable objects at a later time. They come in 2 flavors:

In Java

DispatchQueue queue = getGlobalQueue(HIGH);

In Scala

val queue = getGlobalQueue(HIGH)

In Java

DispatchQueue queue = createQueue("My queue");

In Scala

val queue = createQueue("My Queue")

Handy imports

The examples in this document assume that you have added the following imports:

In Java

import org.fusesource.hawtdispatch.*;
import static org.fusesource.hawtdispatch.Dispatch.*;

In Scala

import _root_.org.fusesource.hawtdispatch._

Submitting Runnable Objects

Once you have a reference to a queue object you can use it to perform some asynchronous processing. The Scala queue object is enriched with several helpers to make enqueuing async tasks easier. Example:

In Java

queue.execute(new Runnable(){
  public void run() {
    System.out.println("Hi!");
  }
});

in Scala

queue {
  System.out.println("Hi!");
}
// or
queue.execute(^{
  System.out.println("Hi!");
})

The ^{ .... } block syntax in the Scala example is browed from the GCD. It produces a regular Java Runnable object.

Dispatch Sources

A Dispatch Source is used trigger the execution of task on a queue based on an external event. They are usually used to integrate with external IO events from NIO, but you can also use a custom Dispatch Source to coalesce multiple application generated events into a single event which triggers an async task.

Dispatch sources are initially created in a suspended state. Once its' created and you have configured it's event handler, you should call the DispatchSource.resume method so that it is executed on the specified queue. If you later want to stop processing events for a period of time, call the DispatchSource.suspend method.

NIO Dispatch Source

NIO integration is accomplished via a DispatchSource object which is created using the Dispatch.createSource method. You supply it the SelectableChannel and the operations your interested in receiving events for like OP_READ or OP_WRITE and when it's that NIO event is raised, it will execute a runnable callback you configure on a dispatch queue you specify. HawtDispatch takes care of setting up and managing the NIO selectors and selector keys for you.

Example:

In Java

SelectableChannel channel = ...
DispatchQueue queue = createQueue()
DispatchSource source = createSource(channel, OP_READ, queue);
source.setEventHandler(new Runnable(){
  public void run() {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int count;
    while( (c=channel.read(buffer)) > 0 ) {
      // just dump it to the console
      System.out.write(buffer.array(), buffer.offset(), buffer.position());
    }
  }
});
source.resume();

In Scala

val channel:SelectableChannel = ...
val queue = createQueue
val source = createSource(channel, OP_READ, queue)
source.onEvent {
  val buffer = ByteBuffer.allocate(1024)
  var count=0
  while( (c=channel.read(buffer)) > 0 ) {
    // just dump it to the console
    System.out.write(buffer.array(), buffer.offset(), buffer.position())
  }
}
source.resume

Custom Dispatch Source

A Custom Dispatch Source is used to coalesce multiple application generated events into a single event which triggers an async task. By using a Custom Dispatch Source you reduce the amount of cross thread contention since multiple events generated by one thread are passed to the processing thread as a single batch.

When you create a custom dispatch source, you provide it an aggregator which controls how events are coalesced. The supplied aggregators are:

Event producers can call the merge(event) method on the custom dispatch source to supply it data. Calling the merge method will cause the event handler Runnable configured on the dispatch source to be executed. When it is executed, it you should use the custom dispatch source getData() method to access the merged event. The getData() should only be called from the configured event handler.

In Java

final Semaphore done = new Semaphore(1-(1000*1000));

DispatchQueue queue = createQueue();
final CustomDispatchSource<Integer, Integer> source = createSource(EventAggregators.INTEGER_ADD, queue);
source.setEventHandler(new Runnable() {
  public void run() {
    int count = source.getData();
    System.out.println("got: " + count);
    done.release(count);
  }
});
source.resume();

// Produce 1,000,000 concurrent merge events
for (int i = 0; i < 1000; i++) {
  getGlobalQueue().execute(new Runnable() {
    public void run() {
      for (int j = 0; j < 1000; j++) {
        source.merge(1);
      }
    }
  });
}

// Wait for all the event to arrive.
done.acquire();

In Scala

val done = new Semaphore(1 - (1000 * 1000))

val queue = createQueue()
val source = createSource(EventAggregators.INTEGER_ADD, queue)
source.onEvent {
  val count = source.getData()
  println("got: " + count)
  done.release(count.intValue)
}
source.resume();

// Produce 1,000,000 concurrent merge events
for (i <- 0 until 1000) {
  globalQueue {
    for (j <- 0 until 1000) {
      source.merge(1)
    }
  }
}

// Wait for all the event to arrive.
done.acquire()

On an 8 core machine you would see output similar to:


got: 167000
got: 103000
got: 103000
got: 163000
got: 119000
got: 109000
got: 111000
got: 125000

Restrictions on Executed Runnables

All runnable actions executed asynchronously by HawtDispatch should be non-blocking and avoid waiting on any synchronized objects. If a blocking call has to performed, it should be done asynchronously in a new thread not managed by HawtDispatch.

Common Patterns

Protecting Mutable State

A common pattern that shows up to use a serial queue to synchronize access to the mutable state of an object. Example:


  class MyCounter {
    val queue = createQueue()
    var counter = 0;

    def add(value:Int) = queue {
      counter += value
    } 

    def sub(value:Int) = queue {
      counter -= value
    }
  }

Scala Only Features

When you use the HawtDispatch Scala interface you gain several additional features such are the ability to capture asynchronous results in in Futures or the ability to use Scala continuation.

Using Futures

With futures you can run an asynchronous computation and return a Future instance which you can use to obtain the result of the future computation.

Creating the future:


val future:Future[Int] = queue.future {
  var rc=0;
  for( i <- 0 until 100 ) {
    rc = rc * i
  }
  rc
}

Checking to see if the future has completed:


if( future.completed ) {
  println("computation completed")
}

Setting a callback to be invoked once the future completes:


future.onComplete {
  println("computation completed")
}

Getting the result of the future computation.


val rc:Int = future() 

Warning: getting the result of the future blocks until the future completes.

Sometimes you need to work with multiple futures concurrently. Perhaps you want to wait for them all to complete or wait for the first one to complete.

Assuming you have created a few futures such as:


val future1:Future[Int] = ...
val future2:Future[Int] = ...
val future3:Future[Int] = ...

Then you can create a future that gathers and then wait for the results using the Future companion object. Example:


val future = Future.all(List(future1,future2,future3))
future().foreach { result:Int =>
  println(result)
}

To just wait for the first result:


val future:Future[Int] = Future.first(List(future1,future2,future3))
println("first result was: "+future())

To collect the results via a folding function:


val future:Future[Int] = Future.fold(List(future1,future2,future3), 0) { (sum, value)=>
  sum+value
}
println("future sum was: "+future())

Scala Continuations

Since blocking operations are not allowed in HawtDispatch executed tasks, when one dispatch queue task needs to get a result of a computation from another task that is asynchronously executed you typically have to use callback or continuation passing style API. Example:


object Foo {
  val a = createQueue()
  var held:Int = 0;

  def hold(v:Int)(result: (Int)=>Unit) = a {
    val rc = held
    held = v
    result( rc )
  }
}

Notice that the example above executes the computation on dispatch queue a to guard access to the held variable. Next you will find and example of what a caller to that hold method looks like.


object Bar {
  val b = createQueue()
  var sum:Int = 0;

  def test = b {
    Foo.hold(sum+5) { result=>
      b {
        sum += result
        println("sum is at: "+sum)
      }
    }
  }
}

In the case of the Bar object above, access to the sum variable is being guarded by the b dispatch queue. It calls the Foo.hold method and passes the callback function. Notice that two b { ... } blocks are required to protect access to the sum variable.

When you use HawtDispatch's scala continuation support, the previous example is simplified by eliminating the need to pass the callback parameter. Example:


import util.continuations._

object Foo {
  var held:Int = 0;
  val a = createQueue()

  def hold(v:Int) = a ! {
    val rc = held
    held = v
    rc
  }
}

When you use dispatch queue's '!' method, it's function block will get executed asynchronously on the dispatch queue and the result of the function will be passed to the continuation that gets passed in implicitly by the Scala compiler's continuation plugin.

On the calling side of the method, you must use the reset { ... } to delimit what defines the continuation. Every operation after the hold method call will be re-written into a continuation block and passed as the implicit continuation of hold method. Once the hold method completes it calls back to the implicit continuation.


object Bar {
  var sum:Int = 0;
  val b = createQueue()

  def test = b {
    reset {
      val result = Foo.hold(sum+5)
      sum += result
      println("sum is at: "+sum)
    }
  }
}

Note that in this version, we did not need the 2nd b { } block to guard access when we assign the sum variable. This is because the '!' method stores the original dispatch queue it was called from and executes the continuation as a task on the original dispatch queue.

To make resetting even easier and to pass back a future result of the async call, the previous test method simplified using the !! method as the following example shows:


object Bar {
  def test:Future[Int] = b !! {
    val result = Foo.hold(sum+5)
    sum += result
    println("sum is at: "+sum)
    sum
  }  
}

The !! method executes the given block within a reset and captures it's result in a Future which can be used by the caller to examine the future result of the computation.

Rich Executors

Once you import import _root_.org.fusesource.hawtdispatch._ all Executor implementations get most extra syntax sugar that dispatch queues enjoy. For example:


val executor = java.util.concurrent.Executors.newSingleThreadExecutor()

executor {
  println("This block is run async on the exectutor");
}

For more details see the RichExecutor documentation.

Profiling Hooks

Every dispatch queue now has a metrics method that allows you to collect the usage Metrics of the queue since the method was last called if profiling is enabled for the dispatch queue. If profiling is not enabled on the queue then the method returns null. To enable or disable profiling on a queue you can call the queue's profile method. If you want to enable profiling on all queues by default then set the hawtdispatch.profile System property on the JVM.

If you want to get the metrics for all the queues currently being used in the JVM, then you can call the Dispatch.metrics() method to get a list of metrics of all the queues.

The collected metrics can come in handy to trouble shoot misbehaving applications. You can setup a periodic task to check for dispatch queues which who's tasks either wait for a long time before executing or take a long time to execute. Example:


  def monitor_hawtdispatch:Unit = {

    import java.util.TimeUnit._
    import collection.JavaConversions._

    // do the actual check in 1 second..
    getGlobalQueue().after(1, SECONDS) {

      Dispatch.metrics.foreach{ metric=>
        if( metric.totalWaitTimeNS > MILLISECONDS.toNanos(10)) {
          println("Dispatch queue haveing slow wait time: "+metric)
        }
        if( metric.totalRunTimeNS > MILLISECONDS.toNanos(10)) {
          println("Dispatch queue haveing long rune time: "+metric)
        }
      }

      // to check again...
      monitor_hawtdispatch
    }
  }

References

Applications Using HawtDispatch