Overview
HawtDispatch is a small ( less than 100k ) thread pooling and NIO event
notification framework API modeled after the libdispatch API that Apple
created to power the Grand Central Dispatch (GCD) technology in OS X. It
allows you to easily develop multi-threaded applications without having to
deal with the problems that traditionally plague multi-threaded application
development.
Features
- Java 1.5 API
- Scala 2.8 API
- Thread Pooling
- Delayed Task Execution
- Priority Task Execution
- NIO Event Notifications
The DispatchQueue
The most important objects in the HawtDispatch API, are the DispatchQueue objects. They are Executor objects which will execute submitted runnable objects at a later time. They come in 2 flavors:
Global Dispatch Queue: The tasks submitted to the global dispatch queues will execute concurrently and therefore must be thread safe. The order of execution of the tasks is non deterministic. There are only 3 global queues shared system wide. One for each priority level and can be accessed using the
Dispatch.getGlobalQueuemethod.Example:
In Java
DispatchQueue queue = getGlobalQueue(HIGH);In Scala
val queue = getGlobalQueue(HIGH)
Serial Dispatch Queue: Execute the submitted runnable tasks in FIFO order. A serial dispatch queue will only invoke one runnable at a time, but independent queues may each execute their runnable objects concurrently with respect to each other. Serial dispatch queues are created by the application using the
Dispatch.createQueuemethod.Example:
In Java
DispatchQueue queue = createQueue("My queue");In Scala
val queue = createQueue("My Queue")
Handy imports
The examples in this document assume that you have added the following imports:
In Java
import org.fusesource.hawtdispatch.*;
import static org.fusesource.hawtdispatch.Dispatch.*;In Scala
import _root_.org.fusesource.hawtdispatch._
Submitting Runnable Objects
Once you have a reference to a queue object you can use it to perform some asynchronous processing. The Scala queue object is enriched with several helpers to make enqueuing async tasks easier. Example:
In Java
queue.execute(new Runnable(){
public void run() {
System.out.println("Hi!");
}
});in Scala
queue {
System.out.println("Hi!");
}
// or
queue.execute(^{
System.out.println("Hi!");
})
The ^{ .... } block syntax in the Scala example is browed from the GCD. It produces
a regular Java Runnable object.
Dispatch Sources
A Dispatch Source is used trigger the execution of task on a queue based on an external event. They are usually used to integrate with external IO events from NIO, but you can also use a custom Dispatch Source to coalesce multiple application generated events into a single event which triggers an async task.
Dispatch sources are initially created in a suspended state. Once its' created and you have configured it's event handler, you should
call the DispatchSource.resume method so that it is executed on the specified queue. If you later
want to stop processing events for a period of time, call the DispatchSource.suspend method.
NIO Dispatch Source
NIO integration is accomplished via a DispatchSource object which is
created using the Dispatch.createSource method. You supply it
the SelectableChannel and the operations your interested in receiving events for
like OP_READ or OP_WRITE and when it's that NIO event is raised,
it will execute a runnable callback you configure on a dispatch queue
you specify. HawtDispatch takes care of setting up and managing the
NIO selectors and selector keys for you.
Example:
In Java
SelectableChannel channel = ...
DispatchQueue queue = createQueue()
DispatchSource source = createSource(channel, OP_READ, queue);
source.setEventHandler(new Runnable(){
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count;
while( (c=channel.read(buffer)) > 0 ) {
// just dump it to the console
System.out.write(buffer.array(), buffer.offset(), buffer.position());
}
}
});
source.resume();In Scala
val channel:SelectableChannel = ...
val queue = createQueue
val source = createSource(channel, OP_READ, queue)
source.onEvent {
val buffer = ByteBuffer.allocate(1024)
var count=0
while( (c=channel.read(buffer)) > 0 ) {
// just dump it to the console
System.out.write(buffer.array(), buffer.offset(), buffer.position())
}
}
source.resume
Custom Dispatch Source
A Custom Dispatch Source is used to coalesce multiple application generated events into a single event which triggers an async task. By using a Custom Dispatch Source you reduce the amount of cross thread contention since multiple events generated by one thread are passed to the processing thread as a single batch.
When you create a custom dispatch source, you provide it an aggregator which controls how events are coalesced. The supplied aggregators are:
EventAggregators.INTEGER_ADD: Merges integer events by adding themEventAggregators.LONG_ADD: Merges long events by adding themEventAggregators.INTEGER_OR: Merges integer events by bit wise or'ing themEventAggregators.linkedList(): Merges Object events by adding them to a LinkedList
Event producers can call the merge(event) method on the custom dispatch source to
supply it data. Calling the merge method will cause the event handler Runnable configured
on the dispatch source to be executed. When it is executed, it you should use the
custom dispatch source getData() method to access the merged event. The getData()
should only be called from the configured event handler.
In Java
final Semaphore done = new Semaphore(1-(1000*1000));
DispatchQueue queue = createQueue();
final CustomDispatchSource<Integer, Integer> source = createSource(EventAggregators.INTEGER_ADD, queue);
source.setEventHandler(new Runnable() {
public void run() {
int count = source.getData();
System.out.println("got: " + count);
done.release(count);
}
});
source.resume();
// Produce 1,000,000 concurrent merge events
for (int i = 0; i < 1000; i++) {
getGlobalQueue().execute(new Runnable() {
public void run() {
for (int j = 0; j < 1000; j++) {
source.merge(1);
}
}
});
}
// Wait for all the event to arrive.
done.acquire();In Scala
val done = new Semaphore(1 - (1000 * 1000))
val queue = createQueue()
val source = createSource(EventAggregators.INTEGER_ADD, queue)
source.onEvent {
val count = source.getData()
println("got: " + count)
done.release(count.intValue)
}
source.resume();
// Produce 1,000,000 concurrent merge events
for (i <- 0 until 1000) {
globalQueue {
for (j <- 0 until 1000) {
source.merge(1)
}
}
}
// Wait for all the event to arrive.
done.acquire()
On an 8 core machine you would see output similar to:
got: 167000
got: 103000
got: 103000
got: 163000
got: 119000
got: 109000
got: 111000
got: 125000
Restrictions on Executed Runnables
All runnable actions executed asynchronously by HawtDispatch should be non-blocking and avoid waiting on any synchronized objects. If a blocking call has to performed, it should be done asynchronously in a new thread not managed by HawtDispatch.
Common Patterns
Protecting Mutable State
A common pattern that shows up to use a serial queue to synchronize access to the mutable state of an object. Example:
class MyCounter {
val queue = createQueue()
var counter = 0;
def add(value:Int) = queue {
counter += value
}
def sub(value:Int) = queue {
counter -= value
}
}
Scala Only Features
When you use the HawtDispatch Scala interface you gain several additional features such are the ability to capture asynchronous results in in Futures or the ability to use Scala continuation.
Using Futures
With futures you can run an asynchronous computation and return a Future instance which you can use to obtain the result of the future computation.
Creating the future:
val future:Future[Int] = queue.future {
var rc=0;
for( i <- 0 until 100 ) {
rc = rc * i
}
rc
}
Checking to see if the future has completed:
if( future.completed ) {
println("computation completed")
}
Setting a callback to be invoked once the future completes:
future.onComplete {
println("computation completed")
}
Getting the result of the future computation.
val rc:Int = future()
Warning: getting the result of the future blocks until the future completes.
Sometimes you need to work with multiple futures concurrently. Perhaps you want to wait for them all to complete or wait for the first one to complete.
Assuming you have created a few futures such as:
val future1:Future[Int] = ...
val future2:Future[Int] = ...
val future3:Future[Int] = ...
Then you can create a future that gathers and then wait for the results using the Future companion object. Example:
val future = Future.all(List(future1,future2,future3))
future().foreach { result:Int =>
println(result)
}
To just wait for the first result:
val future:Future[Int] = Future.first(List(future1,future2,future3))
println("first result was: "+future())
To collect the results via a folding function:
val future:Future[Int] = Future.fold(List(future1,future2,future3), 0) { (sum, value)=>
sum+value
}
println("future sum was: "+future())
Scala Continuations
Since blocking operations are not allowed in HawtDispatch executed tasks, when one dispatch queue task needs to get a result of a computation from another task that is asynchronously executed you typically have to use callback or continuation passing style API. Example:
object Foo {
val a = createQueue()
var held:Int = 0;
def hold(v:Int)(result: (Int)=>Unit) = a {
val rc = held
held = v
result( rc )
}
}
Notice that the example above executes the computation on dispatch queue a to guard
access to the held variable. Next you will find and example of what a caller to that
hold method looks like.
object Bar {
val b = createQueue()
var sum:Int = 0;
def test = b {
Foo.hold(sum+5) { result=>
b {
sum += result
println("sum is at: "+sum)
}
}
}
}
In the case of the Bar object above, access to the sum variable is being guarded by
the b dispatch queue. It calls the Foo.hold method and passes the callback function.
Notice that two b { ... } blocks are required to protect access to the sum variable.
When you use HawtDispatch's scala continuation support, the previous example is simplified by eliminating the need to pass the callback parameter. Example:
import util.continuations._
object Foo {
var held:Int = 0;
val a = createQueue()
def hold(v:Int) = a ! {
val rc = held
held = v
rc
}
}
When you use dispatch queue's '!' method, it's function block will get executed asynchronously on the dispatch queue and the result of the function will be passed to the continuation that gets passed in implicitly by the Scala compiler's continuation plugin.
On the calling side of the method, you must use the reset { ... } to delimit what defines
the continuation. Every operation after the hold method call will be re-written into a
continuation block and passed as the implicit continuation of hold method. Once the hold
method completes it calls back to the implicit continuation.
object Bar {
var sum:Int = 0;
val b = createQueue()
def test = b {
reset {
val result = Foo.hold(sum+5)
sum += result
println("sum is at: "+sum)
}
}
}
Note that in this version, we did not need the 2nd b { } block to guard access when we
assign the sum variable. This is because the '!' method stores the original dispatch queue
it was called from and executes the continuation as a task on the original dispatch queue.
To make resetting even easier and to pass back a future result of the async call, the previous
test method simplified using the !! method as the following example shows:
object Bar {
def test:Future[Int] = b !! {
val result = Foo.hold(sum+5)
sum += result
println("sum is at: "+sum)
sum
}
}
The !! method executes the given block within a reset and captures it's result in a Future which
can be used by the caller to examine the future result of the computation.
Rich Executors
Once you import import _root_.org.fusesource.hawtdispatch._ all Executor implementations
get most extra syntax sugar that dispatch queues enjoy. For example:
val executor = java.util.concurrent.Executors.newSingleThreadExecutor()
executor {
println("This block is run async on the exectutor");
}
For more details see the RichExecutor documentation.
Profiling Hooks
Every dispatch queue now has a metrics
method that allows you to collect the usage Metrics of the
queue since the method was last called if profiling is enabled for the dispatch queue. If
profiling is not enabled on the queue then the method returns null. To enable or disable
profiling on a queue you can call the queue's
profile method. If you want to
enable profiling on all queues by default then set the hawtdispatch.profile System
property on the JVM.
If you want to get the metrics for all the queues currently being used in the JVM, then you
can call the Dispatch.metrics() method to get a
list of metrics of all the queues.
The collected metrics can come in handy to trouble shoot misbehaving applications. You can setup a periodic task to check for dispatch queues which who's tasks either wait for a long time before executing or take a long time to execute. Example:
def monitor_hawtdispatch:Unit = {
import java.util.TimeUnit._
import collection.JavaConversions._
// do the actual check in 1 second..
getGlobalQueue().after(1, SECONDS) {
Dispatch.metrics.foreach{ metric=>
if( metric.totalWaitTimeNS > MILLISECONDS.toNanos(10)) {
println("Dispatch queue haveing slow wait time: "+metric)
}
if( metric.totalRunTimeNS > MILLISECONDS.toNanos(10)) {
println("Dispatch queue haveing long rune time: "+metric)
}
}
// to check again...
monitor_hawtdispatch
}
}
References
- Java API
- Scala API
- Echo Server Example Source code for a simple TCP based echo server.
Applications Using HawtDispatch
- ActiveMQ Apollo Apollo is an reliable messaging server built using the HawtDispatch threading architecture.
- Akka Is an Erlang style Actor framework for Scala. It's fastest dispatcher is implemented with HawtDispatch.
- fyrie-redis A Redis client written Scala, using Akka actors, HawtDispath and non-blocking IO. Supports Redis 2.0+