additional container interface that
represents the asynchronous computation itself and on which you register
the callbacks to be notified about the
results of the computation. Now asynchronous methods can return a value
that represents the pending asynchronous computation or stream instead
of just void. In particular, this allows
you to change your mind after making the call and filter, manipulate, or
transform the computation at will.
The Java SDK already provides (
sin-gle-shot) asynchronous computations
as first-class values in the form of the
Future<T> interface, whose principal
method T get() retrieves the result of
the computation and blocks when the
underlying computation has not yet
terminated:
interface Future<T>
{
T get();
T get(long timeout, TimeUnit unit);
boolean isCancelled();
boolean isDone();
}
Note that in principle, Future<T>
could be used to produce multiple
values. In this case each call to get()
would block and return the next value once it is available, as long as
isDone() is not true. This is similar to
the iterable interface. In the rest of
this article, asynchronous computations are assumed to return streams of
multiple results.
While futures do provide a
first-class representation of asynchronous computations, the get
method is blocking. Fortunately,
you can make the JDK interface
Future<T> nonblocking by supplying the T get() method with a
callback of type Observer<T> (the
interface introduced to extend the
AsyncCallback<T> interface of
GWT). Note that the blocking
isCan-celled and isDone methods are no
longer needed because that information is transmitted via the callback
as well. For simplicity ignore the second overload of get since you can easily
reconstruct that later. Applying these
changes, the nonblocking version of
the Future<T> interface looks like this:
interface Future<T>
{
boolean cancel(boolean mayInterrup-
tIfRunning);
void get(Observer<T> callback);
}
You are not yet done refactoring. Instead of canceling the future as a whole
via the cancel method, it makes more
sense to cancel just the particular outstanding call to get per observer. This
can be achieved by letting get return
an interface that represents a cancelable resource. Moreover, since you have
already called get, there is no need to
specify the mayInterruptIfRunning
parameter, because the computation
is already running at that point and
you can encode the Boolean by deciding whether or not to call cancel. Lastly, you can make the cancel method
nonblocking by returning void instead
of boolean. You could try to make
cancel return a Future<boolean>
instead, but then you would fall into
an endless recursive rabbit hole of
asynchrony. As it turns out the java.
io.Closable interface precisely fits
the bill, resulting in the following mutation of Future<T>:
interface Future<T> { Closable
get(Observer<T> callback); }
Note that calling the close() method of the Closable interface returned
by a subscription may or may not actually cancel the underlying computation because a single observable
may have multiple observers (
disposing, say, of the subscription to mouse
moves, which should not stop your
mouse from working). Since that particular observer is not notified of any
further values, however, from its perspective the computation has terminated. If needed, the class that implements IObservable<T> could cancel
the computation in some other way.
Instead of Future<T> and
Observer<T>, .NET has the standard
IObservable<T> and IObserver<T>
interfaces; and instead of Closable,
it has IDisposable. Values of type
IObservable<T> (or Observer<T>
depending on your preferred programming
language) represent asynchronous data
streams, or event streams, with values of
type T.
interface IObservable<T>
{
IDisposable Subscribe(IObserver<T>
observer);
}
interface IObserver<T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
interface IDisposable
{
void Dispose();
}
A closer inspection of the resulting
interface trinity reveals a generic variation of the classic Subject/Observer
interface2 for the publish/subscribe
pattern, a staple in the tool chest of
object-oriented programmers for decades for dealing with event-based
systems. JDK 1.0 already supports this
pattern via the (nongeneric)
Observable class and the Observer
interface. In .NET, the Rx library supports
the pattern.
The Rx library makes some additional behavioral assumptions about the
IObserver<T> and IObservable<T>
interfaces that are not expressed by
their (syntactic) type signatures:
˲ The sequence of calls to an instance of the IObserver<T>
interface should follow the regular expression OnNext(t)* (OnCompleted() |
OnError(e))?. In other words, after
zero or more OnNext calls, either one
of OnCompleted or OnError will optionally be called.
˲ Implementations of IObserver<T>
can assume to be synchronized; conceptually they run under a lock, similar
to regular .NET event handlers, or the
reactor pattern.
10
˲ All resources associated with an
observer should be cleaned up at the
moment OnError or OnCompleted
is called. In particular, the subscription returned by the Subscribe call
of the observer will be disposed of by
the observable as soon as the stream
completes. In practice this is implemented by closing over the
IDisposable returned by Subscribe in the
implementation of the OnError and
OnCompleted methods.
˲ When a subscription is disposed
externally, the IObservable<T> stream