should make a best-effort attempt to
stop all outstanding work for that subscription. Any work already in progress might still complete as it is not always safe to abort work in progress but
should not be signaled to unsubscribed
observers. This contract ensures it is
easy to reason about and prove the correctness of operators and user code.
Asynchronous Data streams
To create an instance of an
Observable<T> in Java, you would
use anonymous inner classes
and define an abstract base class
ObservableBase<T> that takes care
of enforcing the Rx contract. It is spe-
cialized by providing an implementa-
tion of the subscribe method:
time the underlying control fires an
event. Conversely, objects with settable
properties such as lists and labels can
be used as observers for such asynchro-
nous data streams.
figure 4. exposing Gui elements as sources and sinks for asynchronous data streams.
Observable<T> observable = new
ObservableBase<T>()
{
Closable subscribe(Observer<T>
observer) { … }
};
Observable<string> TextChanges(JTextField tf){
return new ObservableBase<string>(){
Closable subscribe(Observer<string> o){
DocumentListener l = new DocumentListener(){
void changedUpdate(DocumentEvent e {
o.OnNext( tf.get Text());};
tf.addDocumentListener (l);
return new Closable() {
void close(){ tf.removeDocumentListener(l);}}}}}
Every time the changedUpdate event fires, the corresponding asynchronous data stream of type
Observable<string> pushes a new string to its subscribers, representing the current content of
the text field.
Since .NET lacks anonymous interfaces, it instead uses a factory
method
Observable.Create that
creates a new observable instance
from an anonymous delegate of type
Func<IObservable<T>, IDispos-able> that implements the
Subscribe function:
Likewise, you can expose objects with setters as sinks of asynchronous data streams by wrapping
them as observers. For example, expose a javax.swing.JList<T> list into an Observer<T[]>
by setting the listData property to the given array whenever onNext is called:
Observer<T[]> ObserveChanges(javax.swing.JList<T> list){
IObservable<T> observable =
Observable.Create<T>(
IObserver<T> observer =>
{ … }
);
figure 5. the Where operator.
IObservable<S>
Just as in the Java solution, the
concrete type returned by the
Create method enforces the required Rx
behavior.
Once you have a single interface to
represent asynchronous data streams,
you can expose existing event- and call-back-based abstractions such as GUI
controls as sources of asynchronous
data streams. For example, you can
wrap the text-changed event of a
Text-Field control in Java as an asynchronous data stream using the delicious
token salad illustrated in Figure 4.
You can think of a UI control, the
mouse, a text field, or a button as a
streaming database that generates an
infinite collection of values for each
Where
IObservable<S>
figure 6. the select operator.
IObservable<S>
Select