Table of Contents

Reactive Operators

Overview

Reactive operators are methods that transform, filter, combine, and manipulate observable streams. They are the building blocks of reactive programming and allow you to compose complex asynchronous operations declaratively.

Transformation Operators

Select (Map)

Transforms each element in the stream:

// Transform numbers to strings
Observable.Range(1, 5)
    .Select(x => $"Number: {x}")
    .Subscribe(s => Console.WriteLine(s));

// Transform events
textBox.Events().TextChanged
    .Select(e => e.EventArgs.Text)
    .Select(text => text.ToUpper())
    .Subscribe(upper => label.Text = upper);

SelectMany (FlatMap)

Projects each element to an observable and flattens the result:

// Async operations
searchBox.Events().TextChanged
    .Select(e => e.EventArgs.Text)
    .SelectMany(text => SearchAsync(text))
    .Subscribe(results => DisplayResults(results));

// Nested sequences
Observable.Range(1, 3)
    .SelectMany(x => Observable.Range(x, 3))
    .Subscribe(n => Console.WriteLine(n));

Scan

Accumulates values over time:

// Running total
Observable.Range(1, 5)
    .Scan((acc, x) => acc + x)
    .Subscribe(total => Console.WriteLine($"Total: {total}"));
// Output: 1, 3, 6, 10, 15

// State machine
clicks.Scan(0, (count, _) => count + 1)
    .Subscribe(count => label.Text = $"Clicks: {count}");

Filtering Operators

Where (Filter)

Filters elements based on a predicate:

// Only even numbers
Observable.Range(1, 10)
    .Where(x => x % 2 == 0)
    .Subscribe(x => Console.WriteLine(x));

// Valid input
searchBox.Events().TextChanged
    .Select(e => e.EventArgs.Text)
    .Where(text => !string.IsNullOrWhiteSpace(text))
    .Where(text => text.Length >= 3)
    .Subscribe(text => PerformSearch(text));

DistinctUntilChanged

Only emits when the value changes:

// Avoid duplicate searches
searchBox.Events().TextChanged
    .Select(e => e.EventArgs.Text)
    .DistinctUntilChanged()
    .Subscribe(text => PerformSearch(text));

Take / TakeUntil

Limits the number of emissions:

// First 5 items
observable.Take(5)
    .Subscribe(x => Console.WriteLine(x));

// Until a condition
clicks.TakeUntil(stopButton.Events().Click)
    .Subscribe(_ => Console.WriteLine("Click"));

Skip

Skips specified number of items:

// Skip first (ignore initial value)
this.WhenAnyValue(x => x.Property)
    .Skip(1)
    .Subscribe(value => HandleChange(value));

Combining Operators

CombineLatest

Combines latest values from multiple streams:

// Form validation
var usernameValid = this.WhenAnyValue(x => x.Username)
    .Select(u => !string.IsNullOrWhiteSpace(u));

var passwordValid = this.WhenAnyValue(x => x.Password)
    .Select(p => p?.Length >= 6);

usernameValid.CombineLatest(passwordValid, (u, p) => u && p)
    .Subscribe(valid => submitButton.IsEnabled = valid);

Merge

Merges multiple streams into one:

// Multiple button clicks
var button1Clicks = button1.Events().Click;
var button2Clicks = button2.Events().Click;

button1Clicks.Merge(button2Clicks)
    .Subscribe(_ => HandleClick());

Zip

Pairs elements from multiple streams:

// Wait for both
var loaded = window.Events().Loaded;
var dataReady = dataService.Events().DataLoaded;

loaded.Zip(dataReady, (l, d) => Unit.Default)
    .Subscribe(_ => Initialize());

Concat

Concatenates streams sequentially:

// Sequential operations
firstOperation
    .Concat(secondOperation)
    .Concat(thirdOperation)
    .Subscribe(result => HandleResult(result));

Time-Based Operators

Throttle

Emits latest value after a period of inactivity:

// Throttle search input
searchBox.Events().TextChanged
    .Throttle(TimeSpan.FromMilliseconds(300))
    .Select(e => e.EventArgs.Text)
    .Subscribe(text => PerformSearch(text));

Debounce

Same as Throttle (alias):

textBox.Events().TextChanged
    .Debounce(TimeSpan.FromMilliseconds(500))
    .Subscribe(e => HandleTextChange(e));

Sample

Samples the stream at intervals:

// Sample every second
mouseMove.Sample(TimeSpan.FromSeconds(1))
    .Subscribe(pos => UpdatePosition(pos));

Delay

Delays emission by specified time:

// Delayed notification
errorOccurred
    .Delay(TimeSpan.FromSeconds(3))
    .Subscribe(_ => HideErrorMessage());

Timeout

Throws if no value within timespan:

// Timeout after 5 seconds
longRunningOperation
    .Timeout(TimeSpan.FromSeconds(5))
    .Catch(Observable.Return(defaultValue))
    .Subscribe(result => HandleResult(result));

Error Handling Operators

Catch

Handles errors gracefully:

// Provide fallback
observable
    .Catch<Data, Exception>(ex => 
    {
        LogError(ex);
        return Observable.Return(defaultData);
    })
    .Subscribe(data => DisplayData(data));

Retry

Retries on error:

// Retry 3 times
networkRequest
    .Retry(3)
    .Subscribe(
        data => ProcessData(data),
        error => ShowError(error));

// Retry with delay
networkRequest
    .RetryWhen(errors => errors
        .SelectMany((ex, attempt) => 
            Observable.Timer(TimeSpan.FromSeconds(attempt + 1))))
    .Subscribe(data => ProcessData(data));

OnErrorResumeNext

Continues with another stream on error:

primarySource
    .OnErrorResumeNext(fallbackSource)
    .Subscribe(data => ProcessData(data));

Utility Operators

Do

Side effects without altering the stream:

observable
    .Do(x => Console.WriteLine($"Value: {x}"))
    .Do(x => LogValue(x))
    .Subscribe(x => ProcessValue(x));

Materialize / Dematerialize

Wraps/unwraps notifications:

observable
    .Materialize()
    .Subscribe(notification => 
    {
        if (notification.Kind == NotificationKind.OnNext)
            Console.WriteLine($"Value: {notification.Value}");
        else if (notification.Kind == NotificationKind.OnError)
            Console.WriteLine($"Error: {notification.Exception}");
    });

ObserveOn

Specifies where to observe values:

// Background work, UI updates
Observable.Start(() => ExpensiveOperation())
    .ObserveOn(RxSchedulers.MainThreadScheduler)
    .Subscribe(result => UpdateUI(result));

SubscribeOn

Specifies where subscription occurs:

observable
    .SubscribeOn(RxSchedulers.TaskpoolScheduler)
    .ObserveOn(RxSchedulers.MainThreadScheduler)
    .Subscribe(value => UpdateUI(value));

Aggregation Operators

Count

Counts elements:

observable.Count()
    .Subscribe(count => Console.WriteLine($"Count: {count}"));

Sum / Average / Min / Max

Aggregate operations:

Observable.Range(1, 10)
    .Sum()
    .Subscribe(sum => Console.WriteLine($"Sum: {sum}"));

Observable.Range(1, 10)
    .Average()
    .Subscribe(avg => Console.WriteLine($"Average: {avg}"));

Reduce / Aggregate

Custom aggregation:

Observable.Range(1, 5)
    .Aggregate((acc, x) => acc * x)
    .Subscribe(product => Console.WriteLine($"Product: {product}"));

Conditional Operators

All / Any

Check conditions:

// All elements positive
Observable.Range(1, 10)
    .All(x => x > 0)
    .Subscribe(allPositive => Console.WriteLine(allPositive));

// Any element even
Observable.Range(1, 10)
    .Any(x => x % 2 == 0)
    .Subscribe(anyEven => Console.WriteLine(anyEven));

Contains

Checks for specific value:

observable.Contains(42)
    .Subscribe(contains => Console.WriteLine($"Contains 42: {contains}"));

Buffer and Window Operators

Buffer

Collects elements into lists:

// Buffer every 5 items
observable.Buffer(5)
    .Subscribe(batch => ProcessBatch(batch));

// Buffer by time
observable.Buffer(TimeSpan.FromSeconds(1))
    .Subscribe(batch => ProcessBatch(batch));

Window

Projects into nested observables:

observable.Window(TimeSpan.FromSeconds(1))
    .Subscribe(window => 
        window.Subscribe(x => Console.WriteLine(x)));

ReactiveUI-Specific Operators

ToProperty

Creates an ObservableAsPropertyHelper:

this.WhenAnyValue(x => x.FirstName, x => x.LastName,
        (f, l) => $"{f} {l}")
    .ToProperty(this, x => x.FullName, out _fullName);

InvokeCommand

Invokes a ReactiveCommand:

searchBox.Events().TextChanged
    .Throttle(TimeSpan.FromMilliseconds(300))
    .InvokeCommand(SearchCommand);

WhenAnyValue

Observes property changes:

this.WhenAnyValue(x => x.SearchText)
    .Where(text => !string.IsNullOrEmpty(text))
    .Subscribe(text => PerformSearch(text));

Composition Example

Combining multiple operators:

searchBox.Events().TextChanged
    .Select(e => e.EventArgs.Text)           // Extract text
    .DistinctUntilChanged()                   // Ignore duplicates
    .Throttle(TimeSpan.FromMilliseconds(300)) // Wait for pause
    .Where(text => text.Length >= 3)          // Minimum length
    .SelectMany(text => SearchAsync(text))    // Async search
    .Catch(Observable.Return(emptyResults))   // Error handling
    .ObserveOn(RxSchedulers.MainThreadScheduler)     // UI thread
    .Subscribe(results => DisplayResults(results));

Best Practices

  1. Chain Operators: Build complex logic by chaining simple operators
  2. Error Handling: Always include error handling operators
  3. Thread Management: Use ObserveOn/SubscribeOn appropriately
  4. Performance: Use appropriate operators (Throttle vs Sample)
  5. Readability: Break complex chains into named intermediates

Operator Categories Quick Reference

Category Operators
Transform Select, SelectMany, Scan
Filter Where, DistinctUntilChanged, Take, Skip
Combine CombineLatest, Merge, Zip, Concat
Time Throttle, Debounce, Sample, Delay, Timeout
Error Catch, Retry, OnErrorResumeNext
Utility Do, ObserveOn, SubscribeOn
Aggregate Count, Sum, Average, Min, Max, Reduce
Conditional All, Any, Contains
Buffer Buffer, Window

Resources