an input-observable collection of type
IObservable<S>, precisely the same
as its cousin that works on pull-based
IEnumerable<T> collections. Figure 5
illustrates this.
Using this operator, you can
cleanse a text field input exposed
as IObservable<string> stream
and remove all empty and null strings
using the query expression input.
Where(s=> !string.IsNullOrEmpty(s)).
In Java 8 with lambda expressions
and defender methods, the code
would look very similar to the C# code
shown here, just using -> instead of
=> for lambdas and different casing
of variable names. Even without those
upcoming Java language features,
however, you can approximate a flu-
ent interface in Java—as in Flume-
Java1 or Reactive4Java9—for manipu-
lating event streams using standard
query operators. For example, by
having the operators as methods on
ObservableBase<T>, you can write
the filter example as:
and the synchronous nature of
IEnumerable<T>. As Figure 7 shows,
values on the source stream appear
asynchronously, even as you are still
producing values from a previous infla-
tor function. In the IEnumerable<T>
case, the next value is pulled from
the source stream only after all values
from the inflator function have been
produced (that is, the output stream
is the concatenation of all subsequent
inflator-produced streams, not the
nondeterministic interleaving), shown
in Figure 8.
figure 7. the selectmany operator.
IObservable<S>
IObservable<IObservable<T>>
and produces the elements of the
most recent inner asynchronous data
stream that has been received up to
that point. This produces a new non-
nested asynchronous data stream of
type IObservable<T>. It allows later
streams to override earlier streams,
always yielding the “latest possible re-
sults,” rather like a scrolling news feed.
input.Where<T>(new Func<string,T>{
Invoke (string s){
return !(s == null || s.length() == 0); }}
To save us all from too much typing,
however, the next couple of examples
are provided only in C#, even though
nothing is C# or .NET specific.
The Select operator takes a transformation function Func<S,T> to
transform each value in the input data
stream of type IObservable<S>. This
produces a new asynchronous result
stream of type IObservable<T>, again
exactly like the IEnumerable<T>-based
version, as seen in Figure 6.
The SelectMany operator is often used to wire together two data
streams, pull-based or push-based.
SelectMany takes a source stream
of type IObservable<S> and an
inflator function of type Func<S,
IObservable<T>>, and from each element in the original source stream
generates a new nested stream of zero,
one, or more elements. It then merges
all intermediate asynchronous data
streams into a single output stream
of type IObservable<T>, as shown in
Figure 7.
The SelectMany operator clearly
shows the difference between the asyn-
chronous nature of IObservable<T>
Func<S, IObservable<T>>
IObservable<T>
figure 8. Concatenation of all subsequent inflator-produced streams.
IEnumerable<S>
Func<S,IEnumerable<T>>
IEnumerable<T>