Thursday, June 27, 2013

Reactive Extension Rx in a nutshell

I am writing for jump start into Rx, for details read Rx sites, articles and books. But if you want start coding some Rx now, dive in.

Fundamental building block of Reactive Extension (Rx) are:
  • IObservable<T> - which has only one method called "Subscribe". 
  • IObserver<T> - which has three methods called "OnNext", "OnComplete", "OnError".
  • ISubject<TSource, TResult> inherits both of the above interfaces, therefore has all four above methods altogether.

Now the fundamental interfaces are first two - IObservable and IObserver. However ISubject is a convenient interface through which you can implement both of the behaviors in the same class.
  • IObservable<T> interface represents the class that sends notifications (the provider); 
  • IObserver<T> interface represents the class that receives them (the observer). 

Now you of course can play with simple examples by implementing above two interfaces and  have your observable send push based notification by calling "OnNext" method of the observer object that you can register with observable by calling its' "Subscribe" method. If a number of observer is hooked up to the observable by calling subscribe method, observable will call each observer's  "OnNext" method in order to send send the push based notification to the observer.


You can think of this as old fashion event handling (via event and delegate) where you register a number of event handler delegate to an event and when the event occurs it fires and each event handlers get called. However, Rx provides much more on the top of these.

Well, no one using Rx actually bothers to implement IObservable and IObserver all the time, however they make extensive use of ISubject.

  • Subject<T> - convenient class, readily usable, which already comes with IObservable and IObserver implemented. How nice!

Remember, Subject is both a observable (publisher) and at the same time observer (consumer).

Therefore watch the next simplest example and after that you are on your own.

var subject =  new Subject<string>();
subject.Subscribe( i  => Console.Writeline(i));
 
subject.OnNext("a");
subject.OnNext("b");
subject.OnNext("c");


Console output will be:
a
b
c


See, inside Subscribe method, I used a lambda expression, which is basically an annonymous observer with implementation of OnNext as Console.Writeline is passed in.

Now, you can put that subscribe statement after a filter to subscribe only to specific notifications. You can filter, aggregate, specify how long or until what condition you want to listen, unsubscribe, process error, use any LINQ statements and whole lot more through Rx.

subject
.TakeWhile(j => j != "a")
.TakeUntil(Disposed)
.Subscribe( i => Console.Writeline(i));

And this time console output will be:
b
c

So enjoy Reactive Extension (Rx).