Friday, March 3, 2017

Testing your Rx application

If you have an observable sequence that publishes values over an extended period of time, testing it in real time can be a stretch. The Reactive Extension library provides the TestScheduler type to assist testing this kind of time-dependent code without actually waiting for time to pass. The TestScheduler inherits VirtualScheduler and allows you to create, publish and subscribe to sequences in emulated time. For example, you can compact a publication which takes 5 days to complete into a 2 minute run, while maintaining the correct scale. You can also take a sequence which actually has happened in the past (e.g., a sequence of stock ticks for a previous year) and compute or subscribe to it as if it is pushing out new values in real time.
The factory method Start executes all scheduled tasks until the queue is empty, or you can specify a time to so that queued-up tasks are only executed to the specified time.
The following example creates a hot observable sequence with specified OnNext notifications. It then starts the test scheduler and specifies when to subscribe to and dispose of the hot observable sequence. The Start method returns an instance of the ITestableObserver, which contains a Messages property that records all notifications in a list.
After the sequence has completed, we use the ReactiveAssert.AreElementEqual method to compare the Messages property, together with a list of expected values to see if both are identical (with the same number of items, and items are equal and in the same order). By doing so, we can confirm that we have indeed received the notifications that we expect. In our example, since we only start subscribing at 150, we will miss out the value abc. However, when we compare the values we have received so far at 400, we notice that we have in fact received all the published values after we subscribed to the sequence. And we also verify that the OnCompleted notification was fired at the right time at 500. In addition, subscription information is also captured by the ITestableObservable type returned by the CreateHotObservable method.
In the same way, you can use ReactiveAssert.AreElementsEqual to confirm that subscriptions indeed happened at expected times.
using System;
using System.Reactive;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;

class Program : ReactiveTest
    static void Main(string[] args)
        var scheduler = new TestScheduler();

        var input = scheduler.CreateHotObservable(
            OnNext(100, "abc"),
            OnNext(200, "def"),
            OnNext(250, "ghi"),
            OnNext(300, "pqr"),
            OnNext(450, "xyz"),

        var results = scheduler.Start(
            () => input.Buffer(() => input.Throttle(TimeSpan.FromTicks(100), scheduler))
                       .Select(b => string.Join(",", b)),
            created: 50,
            subscribed: 150,
            disposed: 600);

        ReactiveAssert.AreElementsEqual(results.Messages, new Recorded<Notification<string>>[] {
                OnNext(400, "def,ghi,pqr"),
                OnNext(500, "xyz"),

        ReactiveAssert.AreElementsEqual(input.Subscriptions, new Subscription[] {
                Subscribe(150, 500),
                Subscribe(150, 400),
                Subscribe(400, 500)

Rx Threading

Basically -
SubscribeOn  ThreadPoolScheduler - ttoprocess in a backgroup theres
ObserveOn DispatcherScheduler - to get on the UI the UI thread.

This will queue up on the observer quickly. We can improve this code by using the ObserveOn operator, which allows you to specify the context that you want to use to send pushed notifications (OnNext) to observers. By default, the ObserveOn operator ensures that OnNext will be called as many times as possible on the current thread. You can use its overloads and redirect the OnNext outputs to a different context. In addition, you can use the SubscribeOn operator to return a proxy observable that delegates actions to a specific scheduler. For example, for a UI-intensive application, you can delegate all background operations to be performed on a scheduler running in the background by using SubscribeOn and passing to it a ThreadPoolScheduler. To receive notifications being pushed out and access any UI element, you can pass an instance of the DispatcherScheduler to the ObserveOn operator.
The following example will schedule any OnNext notifications on the current Dispatcher, so that any value pushed out is sent on the UI thread. This is especially beneficial to Silverlight developers who use Rx.

Cold vs. Hot Observables

Cold observables start running upon subscription, i.e., the observable sequence only starts pushing values to the observers when Subscribe is called. Values are also not shared among subscribers. This is different from hot observables such as mouse move events or stock tickers which are already producing values even before a subscription is active. When an observer subscribes to a hot observable sequence, it will get the current value in the stream. The hot observable sequence is shared among all subscribers, and each subscriber is pushed the next value in the sequence. For example, even if no one has subscribed to a particular stock ticker, the ticker will continue to update its value based on market movement. When a subscriber registers interest in this ticker, it will automatically get the latest tick.
The following example demonstrates a cold observable sequence. In this example, we use the Interval operator to create a simple observable sequence of numbers pumped out at specific intervals, in this case, every 1 second.
Two observers then subscribe to this sequence and print out its values. You will notice that the sequence is reset for each subscriber, in which the second subscription will restart the sequence from the first value.
IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));   

IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));

IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));

Console.WriteLine("Press any key to unsubscribe");