Mastering ReactiveUI.Extensions: A Comprehensive Guide to Async Reactive Programming in .NET
Author: Chris Pulman
Published: March 16, 2026
ReactiveUI.Extensions represents a significant evolution in reactive programming for .NET, providing a fully async-native observable framework that bridges the gap between traditional System.Reactive (Rx.NET) and modern async/await patterns. This library introduces IObservableAsync<T>, IObserverAsync<T>, and a comprehensive suite of operators that work seamlessly with ValueTask, CancellationToken, and IAsyncDisposable throughout the entire reactive pipeline.
This article provides an extensive, in-depth technical exploration of ReactiveUI.Extensions, covering every public function, advanced multi-threading scenarios, and practical patterns for building robust, scalable reactive applications.
Table of Contents
- Introduction to ReactiveUI.Extensions
- Understanding IObservableAsync
vs IObservable - Understanding IObserverAsync
vs IObserver - Core Async Interfaces and Types
- Factory Methods and Observable Creation
- Transformation Operators
- Filtering Operators
- Combining and Merging Operators
- Error Handling and Resilience
- Timing and Scheduling
- Aggregation and Terminal Operators
- Async Disposables and Resource Management
- Subjects and Multicasting
- Bridging Classic and Async Observables
- Classic Reactive Extensions Operators
- Advanced Multi-Threading Examples
- Performance Considerations
- Best Practices
1. Introduction to ReactiveUI.Extensions
Why ReactiveUI.Extensions?
Traditional System.Reactive (Rx.NET) was designed before the widespread adoption of async/await in C#. While Rx.NET provides excellent support for asynchronous data streams, it has several limitations in modern .NET development:
- Synchronous Observer Callbacks:
IObserver<T>methods (OnNext,OnError,OnCompleted) are synchronous, which can lead to thread pool starvation when performing async operations - No Built-in Cancellation: Traditional observables don't integrate naturally with
CancellationToken - Synchronous Disposal:
IDisposabledoesn't support async cleanup operations - Task Integration Friction: Converting between
Task,IAsyncEnumerable, andIObservablerequires boilerplate
ReactiveUI.Extensions solves these problems by providing:
- End-to-End Async: Every notification (
OnNextAsync,OnErrorResumeAsync,OnCompletedAsync) returnsValueTask - Cancellation-First Design: Every operator accepts
CancellationToken - Async Disposal: Subscriptions return
IAsyncDisposablefor proper async resource cleanup - Seamless Interop: Bidirectional bridging between
IObservableandIObservableAsync - Modern .NET Support: Requires .NET 8 or later, leveraging modern language features
Installation
dotnet add package ReactiveUI.Extensions
Supported Target Frameworks: .NET 4.6.2, .NET 4.7.2, .NET 4.8.1, .NET 8, .NET 9, .NET 10
2. Understanding IObservableAsync vs IObservable
IObservable (Traditional Rx.NET)
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
Characteristics:
- Synchronous subscription
- Returns
IDisposablefor cleanup - Observer callbacks are synchronous
- No built-in cancellation support
- Can block threads during async operations
IObservableAsync (ReactiveUI.Extensions)
public interface IObservableAsync<T>
{
ValueTask<IAsyncDisposable> SubscribeAsync(
IObserverAsync<T> observer,
CancellationToken cancellationToken);
}
Characteristics:
- Asynchronous subscription (
ValueTask) - Returns
IAsyncDisposablefor async cleanup - Observer callbacks are asynchronous (
ValueTask) - Built-in
CancellationTokensupport - Non-blocking throughout the pipeline
Key Differences Table
| Feature | IObservable |
IObservableAsync |
|---|---|---|
| Subscription | Synchronous | Asynchronous (ValueTask) |
| Disposal | IDisposable |
IAsyncDisposable |
| OnNext | void |
ValueTask |
| OnError | void |
ValueTask |
| OnCompleted | void |
ValueTask |
| Cancellation | Manual | Built-in CancellationToken |
| Thread Blocking | Possible | Never |
| .NET Version | Any | .NET 8+ |
When to Use Each
Use IObservableAsync
- Building new async-first applications
- Working with I/O-bound operations (network, database, file system)
- Needing proper cancellation support
- Requiring async resource cleanup
- Targeting .NET 8 or later
Use IObservable
- Maintaining legacy code
- Working with CPU-bound operations
- Interoperating with existing Rx.NET libraries
- Targeting older .NET versions
3. Understanding IObserverAsync vs IObserver
IObserver (Traditional)
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
Problems:
- No Async Support: Cannot await async operations in callbacks
- Exception Handling: Exceptions in callbacks can crash the application
- Resource Cleanup: No async disposal mechanism
- Backpressure: Difficult to implement async backpressure
IObserverAsync (ReactiveUI.Extensions)
public interface IObserverAsync<in T> : IAsyncDisposable
{
ValueTask OnNextAsync(T value, CancellationToken cancellationToken);
ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellationToken);
ValueTask OnCompletedAsync(Result result);
}
Advantages:
- Full Async Support: All callbacks are
ValueTask-based - Error Recovery:
OnErrorResumeAsyncallows error recovery and continuation - Async Disposal: Implements
IAsyncDisposablefor proper cleanup - Cancellation: Every callback receives
CancellationToken - Result Tracking:
OnCompletedAsyncreceivesResultindicating success/failure
Practical Example: Traditional vs Async Observer
// Traditional IObserver - Problem: Blocking async operations
public class TraditionalObserver : IObserver<string>
{
public void OnNext(string value)
{
// BAD: Blocking async operation
var result = SaveToDatabaseAsync(value).Result; // Thread pool starvation!
Console.WriteLine($"Saved: {result}");
}
public void OnError(Exception error) => Console.WriteLine($"Error: {error}");
public void OnCompleted() => Console.WriteLine("Completed");
}
// Async IObserverAsync - Solution: Non-blocking throughout
public class AsyncObserver : ObserverAsync<string>
{
protected override async ValueTask OnNextAsyncCore(
string value,
CancellationToken cancellationToken)
{
// GOOD: Non-blocking async operation
var result = await SaveToDatabaseAsync(value, cancellationToken);
Console.WriteLine($"Saved: {result}");
}
protected override async ValueTask OnErrorResumeAsyncCore(
Exception error,
CancellationToken cancellationToken)
{
// Can attempt recovery
await LogErrorAsync(error, cancellationToken);
// Can choose to continue or stop
}
protected override async ValueTask OnCompletedAsyncCore(Result result)
{
await CleanupAsync();
Console.WriteLine($"Completed: {result.IsSuccess}");
}
protected override ValueTask DisposeAsyncCore() => CleanupResourcesAsync();
}
Error Recovery with OnErrorResumeAsync
One of the most powerful features of IObserverAsync<T> is the ability to recover from errors:
public class ResilientObserver : ObserverAsync<int>
{
private int _retryCount = 0;
private const int MaxRetries = 3;
protected override async ValueTask OnNextAsyncCore(
int value,
CancellationToken cancellationToken)
{
try
{
await ProcessValueAsync(value, cancellationToken);
}
catch (Exception ex) when (ex is TimeoutException)
{
// Error will be sent to OnErrorResumeAsyncCore
throw;
}
}
protected override async ValueTask OnErrorResumeAsyncCore(
Exception error,
CancellationToken cancellationToken)
{
if (_retryCount < MaxRetries)
{
_retryCount++;
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
// Continue processing - don't propagate error
return;
}
// Max retries exceeded - log and continue
await LogErrorAsync(error, cancellationToken);
}
protected override ValueTask OnCompletedAsyncCore(Result result)
{
Console.WriteLine($"Completed with {(result.IsSuccess ? "success" : "failure")}");
return default;
}
}
4. Core Async Interfaces and Types
IObservableAsync
public interface IObservableAsync<out T>
{
ValueTask<IAsyncDisposable> SubscribeAsync(
IObserverAsync<T> observer,
CancellationToken cancellationToken);
}
Purpose: Represents an asynchronous push-based notification provider.
Key Points:
- Subscription is asynchronous (
ValueTask) - Returns
IAsyncDisposablefor async cleanup - Supports cancellation via
CancellationToken - Thread-safe for concurrent subscriptions
IObserverAsync
public interface IObserverAsync<in T> : IAsyncDisposable
{
ValueTask OnNextAsync(T value, CancellationToken cancellationToken);
ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellationToken);
ValueTask OnCompletedAsync(Result result);
}
Purpose: Defines an asynchronous observer that receives notifications.
Key Points:
- All callbacks are asynchronous
- Implements
IAsyncDisposablefor cleanup OnErrorResumeAsyncallows error recoveryResultparameter indicates completion status
ObservableAsync (Abstract Base Class)
public abstract class ObservableAsync<T> : IObservableAsync<T>
{
public async ValueTask<IAsyncDisposable> SubscribeAsync(
IObserverAsync<T> observer,
CancellationToken cancellationToken)
{
var subscription = await SubscribeAsyncCore(observer, cancellationToken);
return subscription;
}
protected abstract ValueTask<IAsyncDisposable> SubscribeAsyncCore(
IObserverAsync<T> observer,
CancellationToken cancellationToken);
}
Purpose: Base class for creating custom async observables.
Example: Custom Async Observable
public class TickObservable : ObservableAsync<int>
{
private readonly int _count;
private readonly TimeSpan _interval;
public TickObservable(int count, TimeSpan interval)
{
_count = count;
_interval = interval;
}
protected override async ValueTask<IAsyncDisposable> SubscribeAsyncCore(
IObserverAsync<int> observer,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_ = Task.Run(async () =>
{
try
{
for (int i = 0; i < _count && !cts.Token.IsCancellationRequested; i++)
{
await observer.OnNextAsync(i, cts.Token);
await Task.Delay(_interval, cts.Token);
}
await observer.OnCompletedAsync(Result.Success);
}
catch (OperationCanceledException)
{
await observer.OnCompletedAsync(Result.Success);
}
catch (Exception ex)
{
await observer.OnErrorResumeAsync(ex, cts.Token);
}
}, cts.Token);
return new CancellationDisposable(cts);
}
}
// Usage
var ticks = new TickObservable(10, TimeSpan.FromMilliseconds(100));
await using var subscription = await ticks.SubscribeAsync(
async (value, ct) => Console.WriteLine($"Tick: {value}"),
CancellationToken.None);
ConnectableObservableAsync
public class ConnectableObservableAsync<T> : ObservableAsync<T>
{
public ValueTask<IAsyncDisposable> ConnectAsync(CancellationToken cancellationToken);
}
Purpose: Extends ObservableAsync with deferred connection for multicasting.
Use Case: When you want to control when the source starts emitting (e.g., after multiple subscribers are ready).
Result
public readonly struct Result
{
public bool IsSuccess { get; }
public Exception? Exception { get; }
public static Result Success { get; } = new(true, null);
public static Result Failure(Exception exception) => new(false, exception);
}
Purpose: Carries completion status (success or failure) to OnCompletedAsync.
5. Factory Methods and Observable Creation
ObservableAsync.Create
Creates an observable from an async subscription delegate.
public static IObservableAsync<T> Create<T>(
Func<IObserverAsync<T>, CancellationToken, ValueTask<IAsyncDisposable>> subscribeAsync)
Example:
var custom = ObservableAsync.Create<string>(async (observer, ct) =>
{
await observer.OnNextAsync("Hello", ct);
await observer.OnNextAsync("World", ct);
await observer.OnCompletedAsync(Result.Success);
return DisposableAsync.Empty;
});
await using var sub = await custom.SubscribeAsync(
async (value, ct) => Console.WriteLine(value),
CancellationToken.None);
ObservableAsync.CreateAsBackgroundJob
Runs the subscription logic as a background task.
public static IObservableAsync<T> CreateAsBackgroundJob<T>(
Func<IObserverAsync<T>, CancellationToken, ValueTask> job,
bool startSynchronously = false)
Example:
var background = ObservableAsync.CreateAsBackgroundJob<int>(async (observer, ct) =>
{
for (int i = 0; i < 5; i++)
{
await observer.OnNextAsync(i, ct);
await Task.Delay(100, ct);
}
await observer.OnCompletedAsync(Result.Success);
});
ObservableAsync.Return
Emits a single value and completes.
public static IObservableAsync<T> Return<T>(T value)
Example:
var single = ObservableAsync.Return(42);
ObservableAsync.Empty
Completes immediately without emitting.
public static IObservableAsync<T> Empty<T>()
ObservableAsync.Never
Never emits and never completes.
public static IObservableAsync<T> Never<T>()
ObservableAsync.Throw
Completes immediately with an error.
public static IObservableAsync<T> Throw<T>(Exception exception)
ObservableAsync.Range
Emits a sequence of integers.
public static IObservableAsync<long> Range(long start, long count)
Example:
var numbers = ObservableAsync.Range(1, 10); // 1, 2, 3, ..., 10
ObservableAsync.Interval
Emits incrementing values at regular intervals.
public static IObservableAsync<long> Interval(
TimeSpan period,
TimeProvider? timeProvider = null)
Example:
var timer = ObservableAsync.Interval(TimeSpan.FromSeconds(1));
ObservableAsync.Timer
Emits a value after a delay.
public static IObservableAsync<long> Timer(
TimeSpan dueTime,
TimeProvider? timeProvider = null)
Example:
var delayed = ObservableAsync.Timer(TimeSpan.FromSeconds(5));
ObservableAsync.Defer
Defers observable creation until subscription.
public static IObservableAsync<T> Defer<T>(
Func<IObservableAsync<T>> factory)
Example:
var deferred = ObservableAsync.Defer(() =>
ObservableAsync.Return(DateTime.Now.Second));
ObservableAsync.FromAsync
Wraps an async function as an observable.
public static IObservableAsync<T> FromAsync<T>(
Func<CancellationToken, Task<T>> factory)
Example:
var fromTask = ObservableAsync.FromAsync(async ct =>
{
await Task.Delay(100, ct);
return 42;
});
Conversion Extensions
// Task to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(this Task<T> task)
// IAsyncEnumerable to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(
this IAsyncEnumerable<T> asyncEnumerable)
// IEnumerable to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(
this IEnumerable<T> enumerable)
// IObservable to IObservableAsync (Bridge)
public static IObservableAsync<T> ToObservableAsync<T>(
this IObservable<T> observable)
// IObservableAsync to IObservable (Bridge)
public static IObservable<T> ToObservable<T>(
this IObservableAsync<T> asyncObservable)
Example:
// From Task
var task = Task.FromResult(42);
var obs = task.ToObservableAsync();
// From IAsyncEnumerable
async IAsyncEnumerable<int> GenerateAsync()
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(50);
yield return i;
}
}
var fromAsyncEnum = GenerateAsync().ToObservableAsync();
// Bridge from classic IObservable
var classic = Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(5);
var asyncVersion = classic.ToObservableAsync();
6. Transformation Operators
Select (Sync)
Projects each element using a synchronous function.
public static IObservableAsync<TResult> Select<TSource, TResult>(
this IObservableAsync<TSource> source,
Func<TSource, TResult> selector)
Example:
var source = ObservableAsync.Range(1, 5);
var doubled = source.Select(x => x * 2); // 2, 4, 6, 8, 10
Select (Async)
Projects each element using an async function.
public static IObservableAsync<TResult> Select<TSource, TResult>(
this IObservableAsync<TSource> source,
Func<TSource, CancellationToken, ValueTask<TResult>> asyncSelector)
Example:
var projected = source.Select(async (x, ct) =>
{
await Task.Delay(10, ct);
return x.ToString();
});
SelectMany
Flattens nested async observables.
public static IObservableAsync<TResult> SelectMany<TSource, TResult>(
this IObservableAsync<TSource> source,
Func<TSource, IObservableAsync<TResult>> selector)
Example:
var flattened = source.SelectMany(x =>
ObservableAsync.Range(x * 10, 3));
// For x=1: 10, 11, 12
// For x=2: 20, 21, 22
// etc.
Scan (Sync)
Applies an accumulator over the sequence.
public static IObservableAsync<TAccumulate> Scan<TSource, TAccumulate>(
this IObservableAsync<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> accumulator)
Example:
var runningTotal = source.Scan(0, (acc, x) => acc + x);
// 1, 3, 6, 10, 15
Scan (Async)
Async accumulator.
public static IObservableAsync<TAccumulate> Scan<TSource, TAccumulate>(
this IObservableAsync<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> asyncAccumulator)
Cast
Casts each element to the specified type.
public static IObservableAsync<TResult> Cast<TSource, TResult>(
this IObservableAsync<TSource> source)
OfType
Filters elements assignable to the specified type.
public static IObservableAsync<TResult> OfType<TSource, TResult>(
this IObservableAsync<TSource> source)
GroupBy
Groups elements by key.
public static IObservableAsync<GroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(
this IObservableAsync<TSource> source,
Func<TSource, TKey> keySelector)
Example:
var grouped = source.GroupBy(x => x % 2 == 0 ? "even" : "odd");
7. Filtering Operators
Where (Sync)
Filters elements using a synchronous predicate.
public static IObservableAsync<T> Where<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate)
Example:
var evens = source.Where(x => x % 2 == 0);
Where (Async)
Filters using an async predicate.
public static IObservableAsync<T> Where<T>(
this IObservableAsync<T> source,
Func<T, CancellationToken, ValueTask<bool>> asyncPredicate)
Example:
var asyncFiltered = source.Where(async (x, ct) =>
{
await Task.Delay(1, ct);
return x > 5;
});
Take
Takes the first N elements.
public static IObservableAsync<T> Take<T>(
this IObservableAsync<T> source,
int count)
Skip
Skips the first N elements.
public static IObservableAsync<T> Skip<T>(
this IObservableAsync<T> source,
int count)
TakeWhile
Takes elements while predicate holds.
public static IObservableAsync<T> TakeWhile<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate)
SkipWhile
Skips elements while predicate holds.
public static IObservableAsync<T> SkipWhile<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate)
TakeUntil (Multiple Overloads)
Takes elements until a signal.
// Until another observable
public static IObservableAsync<T> TakeUntil<T>(
this IObservableAsync<T> source,
IObservableAsync<Unit> other)
// Until a task
public static IObservableAsync<T> TakeUntil<T>(
this IObservableAsync<T> source,
Task other)
// Until cancellation token
public static IObservableAsync<T> TakeUntil<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
// Until predicate
public static IObservableAsync<T> TakeUntil<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate)
Example:
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var bounded = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100))
.TakeUntil(cts.Token);
Distinct
Emits only unique elements.
public static IObservableAsync<T> Distinct<T>(
this IObservableAsync<T> source)
DistinctBy
Emits only elements with unique keys.
public static IObservableAsync<T> DistinctBy<T, TKey>(
this IObservableAsync<T> source,
Func<T, TKey> keySelector)
DistinctUntilChanged
Suppresses consecutive duplicates.
public static IObservableAsync<T> DistinctUntilChanged<T>(
this IObservableAsync<T> source)
DistinctUntilChangedBy
Suppresses consecutive elements with same key.
public static IObservableAsync<T> DistinctUntilChangedBy<T, TKey>(
this IObservableAsync<T> source,
Func<T, TKey> keySelector)
Example:
var items = new[] { 1, 2, 2, 3, 1, 3 }.ToObservableAsync();
var unique = items.Distinct(); // 1, 2, 3
var noConsecDups = items.DistinctUntilChanged(); // 1, 2, 3, 1, 3
8. Combining and Merging Operators
Merge
Merges multiple observables, interleaving values.
// Two sources
public static IObservableAsync<T> Merge<T>(
this IObservableAsync<T> first,
IObservableAsync<T> second)
// Multiple sources with concurrency limit
public static IObservableAsync<T> Merge<T>(
this IObservableAsync<IObservableAsync<T>> sources,
int maxConcurrency)
Example:
var a = ObservableAsync.Range(1, 3); // 1, 2, 3
var b = ObservableAsync.Range(10, 3); // 10, 11, 12
var merged = ObservableAsync.Merge(a, b); // Interleaved: 1, 10, 2, 11, 3, 12 (order may vary)
Concat
Concatenates observables sequentially.
public static IObservableAsync<T> Concat<T>(
this IObservableAsync<T> first,
IObservableAsync<T> second)
Example:
var sequential = ObservableAsync.Concat(a, b); // 1, 2, 3, 10, 11, 12
Switch
Switches to the most recent inner observable.
public static IObservableAsync<T> Switch<T>(
this IObservableAsync<IObservableAsync<T>> sources)
Use Case: When you want to cancel previous operations when a new one arrives (e.g., search as you type).
CombineLatest
Combines latest values from multiple sources.
// Two sources
public static IObservableAsync<TResult> CombineLatest<T1, T2, TResult>(
this IObservableAsync<T1> first,
IObservableAsync<T2> second,
Func<T1, T2, TResult> selector)
// Up to 8 sources
public static IObservableAsync<TResult> CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(
IObservableAsync<T1> first,
IObservableAsync<T2> second,
// ... up to 8 sources
Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> selector)
Example:
var combined = a.CombineLatest(b, (x, y) => $"{x}+{y}");
// Emits whenever either source emits, with latest values from both
Zip
Pairs elements by index.
public static IObservableAsync<TResult> Zip<T1, T2, TResult>(
this IObservableAsync<T1> first,
IObservableAsync<T2> second,
Func<T1, T2, TResult> selector)
Example:
var zipped = a.Zip(b, (x, y) => x + y); // 11, 13, 15
Prepend
Prepends a single value.
public static IObservableAsync<T> Prepend<T>(
this IObservableAsync<T> source,
T value)
StartWith
Prepends multiple values.
public static IObservableAsync<T> StartWith<T>(
this IObservableAsync<T> source,
params T[] values)
Example:
var withPrefix = a.Prepend(0); // 0, 1, 2, 3
var withMany = a.StartWith(-2, -1, 0); // -2, -1, 0, 1, 2, 3
9. Error Handling and Resilience
Catch
Catches errors and switches to fallback.
public static IObservableAsync<T> Catch<T>(
this IObservableAsync<T> source,
Func<Exception, IObservableAsync<T>> handler)
Example:
var flaky = ObservableAsync.Throw<int>(new InvalidOperationException("Oops"));
var safe = flaky.Catch(ex => ObservableAsync.Return(-1)); // emits -1
CatchAndIgnoreErrorResume
Suppresses error-resume notifications.
public static IObservableAsync<T> CatchAndIgnoreErrorResume<T>(
this IObservableAsync<T> source)
Retry (Infinite)
Re-subscribes indefinitely on error.
public static IObservableAsync<T> Retry<T>(
this IObservableAsync<T> source)
Retry (Count-Limited)
Re-subscribes up to N times.
public static IObservableAsync<T> Retry<T>(
this IObservableAsync<T> source,
int count)
Example:
var retried = flaky.Retry(3); // Retry up to 3 times
OnErrorResumeAsFailure
Converts error-resume to failure completion.
public static IObservableAsync<T> OnErrorResumeAsFailure<T>(
this IObservableAsync<T> source)
10. Timing and Scheduling
Throttle
Suppresses rapid emissions (debounce).
public static IObservableAsync<T> Throttle<T>(
this IObservableAsync<T> source,
TimeSpan timeSpan,
TimeProvider? timeProvider = null)
Example:
var source = ObservableAsync.Interval(TimeSpan.FromMilliseconds(50)).Take(10);
var throttled = source.Throttle(TimeSpan.FromMilliseconds(200));
Delay
Delays each emission.
public static IObservableAsync<T> Delay<T>(
this IObservableAsync<T> source,
TimeSpan timeSpan,
TimeProvider? timeProvider = null)
Timeout
Raises TimeoutException if no value arrives.
// With exception
public static IObservableAsync<T> Timeout<T>(
this IObservableAsync<T> source,
TimeSpan timeSpan)
// With fallback
public static IObservableAsync<T> Timeout<T>(
this IObservableAsync<T> source,
TimeSpan timeSpan,
IObservableAsync<T> fallback)
Example:
var guarded = source.Timeout(TimeSpan.FromSeconds(2));
var withFallback = source.Timeout(
TimeSpan.FromSeconds(2),
ObservableAsync.Return(999L));
ObserveOn
Shifts notifications to specified context.
public static IObservableAsync<T> ObserveOn<T>(
this IObservableAsync<T> source,
AsyncContext context)
public static IObservableAsync<T> ObserveOn<T>(
this IObservableAsync<T> source,
SynchronizationContext context)
public static IObservableAsync<T> ObserveOn<T>(
this IObservableAsync<T> source,
TaskScheduler scheduler)
public static IObservableAsync<T> ObserveOn<T>(
this IObservableAsync<T> source,
IScheduler scheduler)
Example:
var onContext = source.ObserveOn(
new AsyncContext(SynchronizationContext.Current!));
Yield
Yields control between notifications.
public static IObservableAsync<T> Yield<T>(
this IObservableAsync<T> source)
11. Aggregation and Terminal Operators
AggregateAsync
Applies accumulator and returns final result.
public static ValueTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(
this IObservableAsync<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator,
CancellationToken cancellationToken)
Example:
var source = ObservableAsync.Range(1, 5);
int sum = await source.AggregateAsync(0, (a, x) => a + x, CancellationToken.None); // 15
CountAsync
Returns element count.
public static ValueTask<int> CountAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
LongCountAsync
Returns element count as long.
public static ValueTask<long> LongCountAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
AnyAsync
Returns true if any elements exist.
public static ValueTask<bool> AnyAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
public static ValueTask<bool> AnyAsync<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate,
CancellationToken cancellationToken)
AllAsync
Returns true if all elements match predicate.
public static ValueTask<bool> AllAsync<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate,
CancellationToken cancellationToken)
ContainsAsync
Returns true if sequence contains value.
public static ValueTask<bool> ContainsAsync<T>(
this IObservableAsync<T> source,
T value,
CancellationToken cancellationToken)
FirstAsync
Returns first element or throws.
public static ValueTask<T> FirstAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
FirstOrDefaultAsync
Returns first element or default.
public static ValueTask<T> FirstOrDefaultAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
LastAsync / LastOrDefaultAsync
Returns last element.
public static ValueTask<T> LastAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
public static ValueTask<T> LastOrDefaultAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
SingleAsync / SingleOrDefaultAsync
Returns single element or throws.
public static ValueTask<T> SingleAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
public static ValueTask<T> SingleOrDefaultAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
ToListAsync
Collects all elements into List.
public static ValueTask<List<T>> ToListAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
Example:
List<int> all = await source.ToListAsync(CancellationToken.None); // [1, 2, 3, 4, 5]
ToDictionaryAsync
Collects into Dictionary.
public static ValueTask<Dictionary<TKey, T>> ToDictionaryAsync<TSource, TKey>(
this IObservableAsync<TSource> source,
Func<TSource, TKey> keySelector,
CancellationToken cancellationToken)
ForEachAsync
Invokes async action for each element.
public static ValueTask ForEachAsync<T>(
this IObservableAsync<T> source,
Func<T, CancellationToken, ValueTask> action,
CancellationToken cancellationToken)
Example:
await source.ForEachAsync(async (x, ct) =>
Console.WriteLine($"Item: {x}"), CancellationToken.None);
WaitCompletionAsync
Awaits completion without capturing values.
public static ValueTask WaitCompletionAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
12. Async Disposables and Resource Management
DisposableAsync
Provides async disposable utilities.
public static class DisposableAsync
{
public static IAsyncDisposable Empty { get; }
public static IAsyncDisposable Create(Func<ValueTask> disposeAsync);
}
CompositeDisposableAsync
Manages multiple async disposables.
public sealed class CompositeDisposableAsync : IAsyncDisposable
{
public CompositeDisposableAsync();
public CompositeDisposableAsync(int capacity);
public CompositeDisposableAsync(params IAsyncDisposable[] disposables);
public CompositeDisposableAsync(IEnumerable<IAsyncDisposable> disposables);
public bool IsDisposed { get; }
public int Count { get; }
public ValueTask AddAsync(IAsyncDisposable item);
public ValueTask<bool> RemoveAsync(IAsyncDisposable item);
public ValueTask ClearAsync();
public bool Contains(IAsyncDisposable item);
public ValueTask DisposeAsync();
}
Example:
var composite = new CompositeDisposableAsync();
await composite.AddAsync(someResource1);
await composite.AddAsync(someResource2);
// All resources disposed together
await composite.DisposeAsync();
SerialDisposableAsync
Manages a single async disposable that can be replaced.
public class SerialDisposableAsync : IAsyncDisposable
{
public ValueTask SetDisposableAsync(IAsyncDisposable? value);
public ValueTask DisposeAsync();
}
Use Case: When you need to replace a resource and automatically dispose the old one.
Example:
var serial = new SerialDisposableAsync();
// Set initial resource
await serial.SetDisposableAsync(resource1);
// Replace - resource1 is automatically disposed
await serial.SetDisposableAsync(resource2);
// Dispose - resource2 is disposed
await serial.DisposeAsync();
SingleAssignmentDisposableAsync
Allows single assignment of async disposable.
public sealed class SingleAssignmentDisposableAsync : IAsyncDisposable
{
public bool IsDisposed { get; }
public IAsyncDisposable? GetDisposable();
public ValueTask SetDisposableAsync(IAsyncDisposable? value);
public ValueTask DisposeAsync();
}
Use Case: When a resource must be assigned exactly once.
13. Subjects and Multicasting
SubjectAsync
Async subject for multicasting.
public static class SubjectAsync
{
public static ISubjectAsync<T> Create<T>();
public static ISubjectAsync<T> CreateBehavior<T>(T initialValue);
public static ISubjectAsync<T> CreateReplayLatest<T>();
}
ISubjectAsync
public interface ISubjectAsync<T> : IObservableAsync<T>, IObserverAsync<T>
{
IObservableAsync<T> Values { get; }
}
ConcurrentSubjectAsync
Forwards notifications to observers concurrently.
public sealed class ConcurrentSubjectAsync<T> : BaseSubjectAsync<T>
{
// Observers notified in parallel for high throughput
}
ConcurrentReplayLatestSubjectAsync
Replays latest value to new subscribers with concurrent notification.
ConcurrentStatelessSubjectAsync
Stateless subject with concurrent notification.
ConcurrentStatelessReplayLatestSubjectAsync
Stateless replay-latest subject with concurrent notification.
Multicasting Operators
// Publish with serial subject
public static ConnectableObservableAsync<T> Publish<T>(
this IObservableAsync<T> source)
// Publish with stateless subject
public static ConnectableObservableAsync<T> StatelessPublish<T>(
this IObservableAsync<T> source)
// Replay latest value
public static ConnectableObservableAsync<T> ReplayLatest<T>(
this IObservableAsync<T> source)
// Auto-connect on first subscriber
public static IObservableAsync<T> RefCount<T>(
this ConnectableObservableAsync<T> source)
// Custom subject factory
public static ConnectableObservableAsync<T> Multicast<T, TSubject>(
this IObservableAsync<T> source,
Func<TSubject> subjectFactory)
where TSubject : ISubjectAsync<T>
Example:
var source = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100)).Take(5);
// Publish + explicit connect
var published = source.Publish();
await using var sub1 = await published.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Sub1: {v}"), CancellationToken.None);
await using var sub2 = await published.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Sub2: {v}"), CancellationToken.None);
await using var connection = await published.ConnectAsync(CancellationToken.None);
// RefCount: auto-connect/disconnect
var shared = source.Publish().RefCount();
// ReplayLatest: new subscribers get most recent value
var replayed = source.ReplayLatest().RefCount();
14. Bridging Classic and Async Observables
ObservableBridgeExtensions
Provides bidirectional conversion.
// IObservable to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(
this IObservable<T> observable)
// IObservableAsync to IObservable
public static IObservable<T> ToObservable<T>(
this IObservableAsync<T> asyncObservable)
// IAsyncEnumerable to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(
this IAsyncEnumerable<T> asyncEnumerable)
// IObservableAsync to IAsyncEnumerable
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IObservableAsync<T> asyncObservable)
Example:
// Bridge from classic IObservable
var classic = Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(5);
var asyncVersion = classic.ToObservableAsync();
// Bridge back to classic
var backToClassic = asyncVersion.ToObservable();
// From IAsyncEnumerable
async IAsyncEnumerable<int> GenerateAsync()
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(50);
yield return i;
}
}
var fromAsyncEnum = GenerateAsync().ToObservableAsync();
15. Classic Reactive Extensions Operators
ReactiveUI.Extensions also provides valuable operators for traditional IObservable<T> that don't ship with System.Reactive.
Null & Signal Helpers
// Filter nulls
public static IObservable<T> WhereIsNotNull<T>(
this IObservable<T?> source)
where T : class
// Convert to Unit signal
public static IObservable<Unit> AsSignal<T>(
this IObservable<T> source)
Timing & Scheduling
// Shared timer (one underlying timer per TimeSpan)
public static IObservable<long> SyncTimer(TimeSpan period)
// Schedule single value
public static IObservable<T> Schedule<T>(
this T value,
TimeSpan dueTime,
IScheduler scheduler)
// Safe scheduling (handles null scheduler)
public static IDisposable ScheduleSafe(
this IScheduler? scheduler,
Action action)
// ThrottleFirst: allow first item per window
public static IObservable<T> ThrottleFirst<T>(
this IObservable<T> source,
TimeSpan window,
IScheduler? scheduler = null)
// ThrottleDistinct: throttle but only emit on change
public static IObservable<T> ThrottleDistinct<T>(
this IObservable<T> source,
TimeSpan throttle,
IScheduler? scheduler = null)
// DebounceImmediate: emit first immediately then debounce
public static IObservable<T> DebounceImmediate<T>(
this IObservable<T> source,
TimeSpan dueTime,
IScheduler? scheduler = null)
Inactivity / Liveness
// Heartbeat during quiet periods
public static IObservable<IHeartbeat<T>> Heartbeat<T>(
this IObservable<T> source,
TimeSpan interval,
IScheduler? scheduler = null)
// Detect stale data
public static IObservable<IStale<T>> DetectStale<T>(
this IObservable<T> source,
TimeSpan staleThreshold,
IScheduler? scheduler = null)
// Buffer until inactive
public static IObservable<IList<T>> BufferUntilInactive<T>(
this IObservable<T> source,
TimeSpan inactivityPeriod,
IScheduler? scheduler = null)
Error Handling
// Ignore all errors
public static IObservable<T> CatchIgnore<T>(
this IObservable<T> source)
// Return fallback on error
public static IObservable<T> CatchAndReturn<T>(
this IObservable<T> source,
T fallback)
// Retry with error handler
public static IObservable<T> OnErrorRetry<T, TException>(
this IObservable<T> source,
Action<TException> onError,
int retryCount = int.MaxValue,
TimeSpan delay = default,
IScheduler? delayScheduler = null)
where TException : Exception
// Retry with exponential backoff
public static IObservable<T> RetryWithBackoff<T>(
this IObservable<T> source,
int maxRetries,
TimeSpan initialDelay,
double backoffFactor = 2.0,
TimeSpan? maxDelay = null,
IScheduler? scheduler = null)
Combining & Aggregation
// All values are true
public static IObservable<bool> CombineLatestValuesAreAllTrue(
this IEnumerable<IObservable<bool>> sources)
// All values are false
public static IObservable<bool> CombineLatestValuesAreAllFalse(
this IEnumerable<IObservable<bool>> sources)
// Get max value
public static IObservable<T> GetMax<T>(
this IObservable<T> source)
// Get min value
public static IObservable<T> GetMin<T>(
this IObservable<T> source)
// Partition into two streams
public static (IObservable<T> True, IObservable<T> False) Partition<T>(
this IObservable<T> source,
Func<T, bool> predicate)
Logical / Boolean
// Boolean negation
public static IObservable<bool> Not(
this IObservable<bool> source)
// Filter true values
public static IObservable<bool> WhereTrue(
this IObservable<bool> source)
// Filter false values
public static IObservable<bool> WhereFalse(
this IObservable<bool> source)
Async / Task Integration
// Sequential async projection
public static IObservable<TResult> SelectAsyncSequential<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> selector)
// Latest only (cancels previous)
public static IObservable<TResult> SelectLatestAsync<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> selector)
// Limited parallelism
public static IObservable<TResult> SelectAsyncConcurrent<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> selector,
int maxConcurrency)
// Async subscription
public static IDisposable SubscribeAsync<T>(
this IObservable<T> source,
Func<T, Task> onNext)
// Synchronous gate
public static IDisposable SubscribeSynchronous<T>(
this IObservable<T> source,
Func<T, Task> onNext)
// Convert to hot Task
public static Task<T> ToHotTask<T>(
this IObservable<T> source)
Backpressure
// Conflate bursty updates
public static IObservable<T> Conflate<T>(
this IObservable<T> source,
TimeSpan minimumPeriod,
IScheduler? scheduler = null)
Filtering / Conditional
// Filter strings by regex
public static IObservable<string> Filter(
this IObservable<string> source,
string regexPattern)
// Take until predicate (inclusive)
public static IObservable<T> TakeUntil<T>(
this IObservable<T> source,
Func<T, bool> predicate)
// Wait for first match
public static IObservable<T> WaitUntil<T>(
this IObservable<T> source,
Func<T, bool> predicate)
// Sample latest on trigger
public static IObservable<T> SampleLatest<T>(
this IObservable<T> source,
IObservable<Unit> trigger)
// Fallback if empty
public static IObservable<T> SwitchIfEmpty<T>(
this IObservable<T> source,
IObservable<T> fallback)
// Drop if busy
public static IObservable<T> DropIfBusy<T>(
this IObservable<T> source,
Func<T, Task> asyncAction)
Buffering & Transformation
// Buffer until delimiter
public static IObservable<IList<T>> BufferUntil<T>(
this IObservable<T> source,
T startDelimiter,
T endDelimiter)
// Buffer until idle
public static IObservable<IList<T>> BufferUntilIdle<T>(
this IObservable<T> source,
TimeSpan idlePeriod)
// Emit consecutive pairs
public static IObservable<(T Previous, T Current)> Pairwise<T>(
this IObservable<T> source)
// Scan with initial value
public static IObservable<TAccumulate> ScanWithInitial<TSource, TAccumulate>(
this IObservable<TSource> source,
TAccumulate initial,
Func<TAccumulate, TSource, TAccumulate> accumulator)
// Shuffle arrays in-place
public static IObservable<T[]> Shuffle<T>(
this IObservable<T[]> source)
Subscription / Side Effects
// Action on subscribe
public static IObservable<T> DoOnSubscribe<T>(
this IObservable<T> source,
Action action)
// Action on dispose
public static IObservable<T> DoOnDispose<T>(
this IObservable<T> source,
Action disposeAction)
Utility & Miscellaneous
// ForEach with low allocations
public static IObservable<T> ForEach<T>(
this IObservable<IEnumerable<T>> source)
// Using helper
public static IObservable<TResult> Using<TDisposable, TResult>(
this TDisposable obj,
Func<TDisposable, TResult> function,
IScheduler? scheduler = null)
where TDisposable : IDisposable
// While loop
public static IObservable<Unit> While(
Func<bool> condition,
Action action,
IScheduler? scheduler = null)
// OnNext with params
public static void OnNext<T>(
this IObserver<T> observer,
params T[] values)
// Read-only BehaviorSubject
public static (IObservable<T> Observable, IObserver<T> Observer) ToReadOnlyBehavior<T>(
T initialValue)
// Property change observable
public static IObservable<TProperty> ToPropertyObservable<TSource, TProperty>(
this TSource source,
Expression<Func<TSource, TProperty>> propertyExpression)
where TSource : INotifyPropertyChanged
16. Advanced Multi-Threading Examples
Example 1: Parallel Data Processing Pipeline
using ReactiveUI.Extensions.Async;
using ReactiveUI.Extensions.Async.Subjects;
public class ParallelDataProcessor
{
private readonly int _maxConcurrency;
public ParallelDataProcessor(int maxConcurrency = 4)
{
_maxConcurrency = maxConcurrency;
}
public async Task ProcessDataStreamAsync(
IAsyncEnumerable<string> input,
Func<string, CancellationToken, Task<string>> processor,
CancellationToken cancellationToken)
{
// Convert to async observable
var observable = input.ToObservableAsync();
// Process with limited concurrency
var processed = observable
.SelectMany(async (item, ct) =>
{
try
{
return await processor(item, ct);
}
catch (Exception ex)
{
Console.WriteLine($"Error processing {item}: {ex.Message}");
return null;
}
})
.Where(x => x != null);
// Subscribe and process results
await processed.ForEachAsync(async (result, ct) =>
{
await SaveResultAsync(result!, ct);
}, cancellationToken);
}
private async Task SaveResultAsync(string result, CancellationToken ct)
{
// Simulate async I/O
await Task.Delay(10, ct);
Console.WriteLine($"Saved: {result}");
}
}
// Usage
var processor = new ParallelDataProcessor(maxConcurrency: 4);
var data = GenerateDataAsync(); // IAsyncEnumerable<string>
await processor.ProcessDataStreamAsync(
data,
async (item, ct) =>
{
await Task.Delay(100, ct); // Simulate processing
return item.ToUpperInvariant();
},
CancellationToken.None);
Example 2: Real-Time Data Aggregation with Multiple Sources
public class RealTimeAggregator
{
public async Task AggregateMultipleSourcesAsync(
CancellationToken cancellationToken)
{
// Create multiple data sources
var source1 = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100))
.Take(50)
.Select(x => $"Source1-{x}");
var source2 = ObservableAsync.Interval(TimeSpan.FromMilliseconds(150))
.Take(50)
.Select(x => $"Source2-{x}");
var source3 = ObservableAsync.Interval(TimeSpan.FromMilliseconds(200))
.Take(50)
.Select(x => $"Source3-{x}");
// Merge all sources with concurrency limit
var merged = ObservableAsync.Merge(
new[] { source1, source2, source3 },
maxConcurrency: 3);
// Buffer and process in batches
var batches = merged
.Buffer(TimeSpan.FromMilliseconds(500))
.Where(batch => batch.Count > 0);
// Process batches in parallel
await batches.ForEachAsync(async (batch, ct) =>
{
Console.WriteLine($"Processing batch of {batch.Count} items");
// Process batch items in parallel
var tasks = batch.Select(async item =>
{
await Task.Delay(10, ct);
return item.ToUpperInvariant();
});
var results = await Task.WhenAll(tasks);
foreach (var result in results)
{
Console.WriteLine($" {result}");
}
}, cancellationToken);
}
}
Example 3: Cancellation-Cooperative Long-Running Operation
public class CancellableDataFetcher
{
public async Task FetchDataWithCancellationAsync(
string url,
CancellationToken cancellationToken)
{
var fetchObservable = ObservableAsync.Create<string>(async (observer, ct) =>
{
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, cancellationToken);
try
{
for (int i = 0; i < 100 && !linkedCts.Token.IsCancellationRequested; i++)
{
// Simulate async HTTP request
var data = await FetchPageAsync(url, i, linkedCts.Token);
await observer.OnNextAsync(data, linkedCts.Token);
await Task.Delay(100, linkedCts.Token);
}
await observer.OnCompletedAsync(Result.Success);
}
catch (OperationCanceledException)
{
await observer.OnCompletedAsync(Result.Success);
}
catch (Exception ex)
{
await observer.OnErrorResumeAsync(ex, linkedCts.Token);
}
finally
{
linkedCts.Dispose();
}
return DisposableAsync.Empty;
});
// Subscribe with cancellation
await using var subscription = await fetchObservable.SubscribeAsync(
async (data, ct) =>
{
await ProcessDataAsync(data, ct);
},
cancellationToken);
}
private async Task<string> FetchPageAsync(string url, int page, CancellationToken ct)
{
await Task.Delay(50, ct); // Simulate network
return $"Page {page} data";
}
private async Task ProcessDataAsync(string data, CancellationToken ct)
{
await Task.Delay(10, ct); // Simulate processing
Console.WriteLine($"Processed: {data}");
}
}
Example 4: Thread-Safe State Management with Async Observables
public class ThreadSafeStateManager<T>
{
private readonly ConcurrentSubjectAsync<T> _subject = new();
private T? _currentState;
private readonly SemaphoreSlim _lock = new(1, 1);
public IObservableAsync<T> State => _subject;
public async ValueTask UpdateStateAsync(T newState, CancellationToken cancellationToken)
{
await _lock.WaitAsync(cancellationToken);
try
{
_currentState = newState;
await _subject.OnNextAsync(newState, cancellationToken);
}
finally
{
_lock.Release();
}
}
public async ValueTask<T> GetStateAsync(CancellationToken cancellationToken)
{
await _lock.WaitAsync(cancellationToken);
try
{
return _currentState!;
}
finally
{
_lock.Release();
}
}
public async ValueTask DisposeAsync()
{
await _lock.WaitAsync();
try
{
await _subject.OnCompletedAsync(Result.Success);
await _subject.DisposeAsync();
}
finally
{
_lock.Release();
_lock.Dispose();
}
}
}
// Usage
var stateManager = new ThreadSafeStateManager<int>();
// Subscribe from multiple threads
var tasks = Enumerable.Range(0, 5).Select(async i =>
{
await using var sub = await stateManager.State.SubscribeAsync(
async (state, ct) =>
{
Console.WriteLine($"Thread {i}: State = {state}");
},
CancellationToken.None);
await Task.Delay(1000);
});
await Task.WhenAll(tasks);
// Update state
await stateManager.UpdateStateAsync(42, CancellationToken.None);
Example 5: Backpressure Handling with Conflation
public class BackpressureHandler
{
public async Task HandleHighFrequencyDataAsync(
CancellationToken cancellationToken)
{
// High-frequency source (1000 events/second)
var highFrequency = ObservableAsync.Interval(TimeSpan.FromMilliseconds(1))
.Take(10000);
// Conflate to 100 events/second (keep latest)
var conflated = highFrequency
.Publish(shared => shared
.Throttle(TimeSpan.FromMilliseconds(10))
.Merge(shared.TakeLast(1)));
// Process at manageable rate
await conflated.ForEachAsync(async (value, ct) =>
{
await Task.Delay(5, ct); // Simulate processing
Console.WriteLine($"Processed: {value}");
}, cancellationToken);
}
}
17. Performance Considerations
ValueTask vs Task
ReactiveUI.Extensions uses ValueTask throughout for better performance:
// ValueTask avoids heap allocation for synchronous completions
public ValueTask OnNextAsync(T value, CancellationToken cancellationToken)
Benefits:
- No heap allocation when operation completes synchronously
- Reduced GC pressure
- Better performance for hot paths
Allocation-Aware Operators
Many operators are designed to minimize allocations:
// ForEach with low allocations
public static IObservable<T> ForEach<T>(
this IObservable<IEnumerable<T>> source)
Threading and Concurrency
Best Practices:
- Use ConcurrentSubjectAsync for high throughput:
var subject = new ConcurrentSubjectAsync<string>();
// Observers notified in parallel
- Limit concurrency with Merge:
var merged = sources.Merge(maxConcurrency: 4);
- Use ObserveOn for context switching:
var onUiThread = source.ObserveOn(SynchronizationContext.Current!);
Memory Management
Dispose Properly:
// Always use await using for async disposables
await using var subscription = await observable.SubscribeAsync(...);
// Or use CompositeDisposableAsync for multiple resources
var composite = new CompositeDisposableAsync();
await composite.AddAsync(subscription1);
await composite.AddAsync(subscription2);
await composite.DisposeAsync();
18. Best Practices
1. Always Pass CancellationToken
// GOOD
await observable.ForEachAsync(async (item, ct) =>
{
await ProcessAsync(item, ct);
}, cancellationToken);
// BAD - No cancellation support
await observable.ForEachAsync(async item =>
{
await ProcessAsync(item, CancellationToken.None);
});
2. Use Async Disposables Properly
// GOOD
await using var subscription = await observable.SubscribeAsync(...);
// BAD - Synchronous disposal of async resources
using (await observable.SubscribeAsync(...)) { }
3. Handle Errors in OnErrorResumeAsync
public class ResilientObserver : ObserverAsync<T>
{
protected override async ValueTask OnErrorResumeAsyncCore(
Exception error,
CancellationToken cancellationToken)
{
// Log error
await LogErrorAsync(error, cancellationToken);
// Decide whether to continue or stop
// Don't rethrow unless you want to terminate
}
}
4. Use Appropriate Subjects
// For simple multicasting
var subject = SubjectAsync.Create<T>();
// For high-throughput scenarios
var subject = new ConcurrentSubjectAsync<T>();
// For replaying latest value
var subject = SubjectAsync.CreateReplayLatest<T>();
5. Limit Concurrency
// GOOD - Limited concurrency
var processed = source.SelectMany(
async item => await ProcessAsync(item),
maxConcurrency: 4);
// BAD - Unlimited concurrency
var processed = source.SelectMany(
async item => await ProcessAsync(item));
6. Use ObserveOn for UI Updates
// Ensure UI updates happen on UI thread
var uiUpdates = source.ObserveOn(SynchronizationContext.Current!)
.SubscribeAsync(async (value, ct) =>
{
UpdateUI(value);
});
7. Bridge Carefully Between Classic and Async
// Convert classic to async when needed
var asyncObservable = classicObservable.ToObservableAsync();
// Convert back when interoperating with Rx.NET libraries
var classic = asyncObservable.ToObservable();
8. Test with TestScheduler
// Use TestScheduler for deterministic testing
var testScheduler = new TestScheduler();
var source = ObservableAsync.Interval(TimeSpan.FromSeconds(1), testScheduler);
// Advance time deterministically
testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);
Conclusion
ReactiveUI.Extensions represents a significant advancement in reactive programming for .NET, providing a fully async-native framework that addresses the limitations of traditional Rx.NET in modern async/await scenarios. By embracing ValueTask, CancellationToken, and IAsyncDisposable throughout the entire pipeline, it enables:
- True end-to-end async without thread blocking
- Proper cancellation support at every level
- Async resource cleanup for I/O-bound operations
- Seamless interop with both classic Rx.NET and modern async patterns
- High performance through allocation-aware design
The library's comprehensive operator set, combined with advanced features like concurrent subjects, async disposables, and bidirectional bridging, makes it an essential tool for building robust, scalable reactive applications in .NET 8 and beyond.
Whether you're building real-time data processing pipelines, handling high-frequency event streams, or simply need better async integration in your reactive code, ReactiveUI.Extensions provides the tools and patterns to succeed.
Additional Resources
- GitHub Repository: https://github.com/reactiveui/Extensions
- System.Reactive Documentation: https://github.com/dotnet/reactive
- ReactiveUI Documentation: https://www.reactiveui.net/docs/
- Introduction to Rx.NET eBook: Free 2nd Edition available
This article covers all public functions within the ReactiveUI.Extensions library as of version 2.2.x. For the most up-to-date information, please refer to the official GitHub repository.