Table of Contents

Class ReactiveExtensions

Namespace
ReactiveUI.Extensions
Assembly
ReactiveUI.Extensions.dll

Extension methods for System.Reactive.

public static class ReactiveExtensions
Inheritance
ReactiveExtensions

Methods

AsSignal<T>(IObservable<T>)

Change the source observable type to System.Reactive.Unit. This allows us to be notified when the observable emits a value.

public static IObservable<Unit> AsSignal<T>(this IObservable<T> observable)

Parameters

observable IObservable<T>

The observable to convert.

Returns

IObservable<Unit>

The signal.

Type Parameters

T

The current type of the observable.

BufferUntil(IObservable<char>, char, char)

Buffers until Start char and End char are found.

public static IObservable<string> BufferUntil(this IObservable<char> @this, char startsWith, char endsWith)

Parameters

this IObservable<char>

The source observable of characters.

startsWith char

The starting delimiter.

endsWith char

The ending delimiter.

Returns

IObservable<string>

A sequence of buffered strings including the start and end delimiters.

BufferUntilIdle<T>(IObservable<T>, TimeSpan, IScheduler?)

Emit a batch when the stream goes quiet.

public static IObservable<IList<T>> BufferUntilIdle<T>(this IObservable<T> source, TimeSpan idleTime, IScheduler? scheduler = null)

Parameters

source IObservable<T>

The source.

idleTime TimeSpan

The idle time.

scheduler IScheduler

The scheduler.

Returns

IObservable<IList<T>>

A sequence of buffered lists.

Type Parameters

T

The type of the elements in the source sequence.

BufferUntilInactive<T>(IObservable<T>, TimeSpan, IScheduler?)

Buffers items until inactivity period elapses then emits and resets buffer.

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> source, TimeSpan inactivityPeriod, IScheduler? scheduler = null)

Parameters

source IObservable<T>

Source sequence.

inactivityPeriod TimeSpan

Inactivity period.

scheduler IScheduler

Scheduler.

Returns

IObservable<IList<T>>

Sequence of buffered lists.

Type Parameters

T

Element type.

CatchAndReturn<T>(IObservable<T>, T)

Catches any error and returns a fallback value then completes.

public static IObservable<T> CatchAndReturn<T>(this IObservable<T> source, T fallback)

Parameters

source IObservable<T>

Source sequence.

fallback T

Fallback value.

Returns

IObservable<T>

Sequence producing either original values or fallback on error then completing.

Type Parameters

T

Element type.

CatchAndReturn<T, TException>(IObservable<T>, Func<TException, T>)

Catches a specific exception type mapping it to a fallback value.

public static IObservable<T> CatchAndReturn<T, TException>(this IObservable<T> source, Func<TException, T> fallbackFactory) where TException : Exception

Parameters

source IObservable<T>

Source sequence.

fallbackFactory Func<TException, T>

Factory producing fallback from the exception.

Returns

IObservable<T>

Recovered sequence.

Type Parameters

T

Element type.

TException

Exception type.

CatchIgnore<TSource>(IObservable<TSource?>)

Catch exception and return Observable.Empty.

public static IObservable<TSource?> CatchIgnore<TSource>(this IObservable<TSource?> source)

Parameters

source IObservable<TSource>

The source.

Returns

IObservable<TSource>

A sequence that ignores errors and completes.

Type Parameters

TSource

The type of the source.

CatchIgnore<TSource, TException>(IObservable<TSource?>, Action<TException>)

Catch exception and return Observable.Empty.

public static IObservable<TSource?> CatchIgnore<TSource, TException>(this IObservable<TSource?> source, Action<TException> errorAction) where TException : Exception

Parameters

source IObservable<TSource>

The source.

errorAction Action<TException>

The error action.

Returns

IObservable<TSource>

A sequence that invokes errorAction on error and completes.

Type Parameters

TSource

The type of the source.

TException

The type of the exception.

CombineLatestValuesAreAllFalse(IEnumerable<IObservable<bool>>)

Latest values of each sequence are all false.

public static IObservable<bool> CombineLatestValuesAreAllFalse(this IEnumerable<IObservable<bool>> sources)

Parameters

sources IEnumerable<IObservable<bool>>

The sources.

Returns

IObservable<bool>

A sequence that emits true when all latest booleans are false.

CombineLatestValuesAreAllTrue(IEnumerable<IObservable<bool>>)

Latest values of each sequence are all true.

public static IObservable<bool> CombineLatestValuesAreAllTrue(this IEnumerable<IObservable<bool>> sources)

Parameters

sources IEnumerable<IObservable<bool>>

The sources.

Returns

IObservable<bool>

A sequence that emits true when all latest booleans are true.

Conflate<T>(IObservable<T>, TimeSpan, IScheduler)

Applies a conflation algorithm to an observable stream. Anytime the stream OnNext twice below minimumUpdatePeriod, the second update gets delayed to respect the minimumUpdatePeriod. If more than 2 updates happen, only the last update is pushed.

public static IObservable<T> Conflate<T>(this IObservable<T> source, TimeSpan minimumUpdatePeriod, IScheduler scheduler)

Parameters

source IObservable<T>

The stream.

minimumUpdatePeriod TimeSpan

Minimum delay between two updates.

scheduler IScheduler

Scheduler to publish updates.

Returns

IObservable<T>

The conflated stream.

Type Parameters

T

The type.

DebounceImmediate<T>(IObservable<T>, TimeSpan, IScheduler?)

Debounces with an immediate first emission then standard debounce behavior.

public static IObservable<T> DebounceImmediate<T>(this IObservable<T> source, TimeSpan dueTime, IScheduler? scheduler = null)

Parameters

source IObservable<T>

Source sequence.

dueTime TimeSpan

Debounce time.

scheduler IScheduler

Scheduler (optional).

Returns

IObservable<T>

Debounced sequence.

Type Parameters

T

Element type.

DebounceUntil<T>(IObservable<T>, TimeSpan, Func<T, bool>)

Debounce until a condition becomes true.

public static IObservable<T> DebounceUntil<T>(this IObservable<T> source, TimeSpan debounce, Func<T, bool> condition)

Parameters

source IObservable<T>

The source.

debounce TimeSpan

The debounce.

condition Func<T, bool>

The condition.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

DetectStale<T>(IObservable<T>, TimeSpan, IScheduler)

Detects when a stream becomes inactive for some period of time.

public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler)

Parameters

source IObservable<T>

source stream.

stalenessPeriod TimeSpan

If source stream does not OnNext any update during this period, it is declared stale.

scheduler IScheduler

The scheduler.

Returns

IObservable<IStale<T>>

Observable stale markers or updates.

Type Parameters

T

update type.

DoOnDispose<T>(IObservable<T>, Action)

Executes an action when subscription is disposed.

public static IObservable<T> DoOnDispose<T>(this IObservable<T> source, Action disposeAction)

Parameters

source IObservable<T>

Source sequence.

disposeAction Action

Action to run on dispose.

Returns

IObservable<T>

Original sequence with dispose side-effect.

Type Parameters

T

Element type.

DoOnSubscribe<T>(IObservable<T>, Action)

Executes an action at subscription time.

public static IObservable<T> DoOnSubscribe<T>(this IObservable<T> source, Action action)

Parameters

source IObservable<T>

Source sequence.

action Action

Action to run on subscribe.

Returns

IObservable<T>

Original sequence with subscribe side-effect.

Type Parameters

T

Element type.

DropIfBusy<T>(IObservable<T>, Func<T, Task>)

Drop values when the previous async operation is still running.

public static IObservable<T> DropIfBusy<T>(this IObservable<T> source, Func<T, Task> asyncAction)

Parameters

source IObservable<T>

The source.

asyncAction Func<T, Task>

The asynchronous action.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Filter(IObservable<string>, string)

Filters strings by regex.

public static IObservable<string> Filter(this IObservable<string> source, string regexPattern)

Parameters

source IObservable<string>

Source sequence.

regexPattern string

Regex pattern.

Returns

IObservable<string>

Filtered sequence.

ForEach<T>(IObservable<IEnumerable<T>>, IScheduler?)

Flattens a sequence of enumerables into individual values.

public static IObservable<T> ForEach<T>(this IObservable<IEnumerable<T>> source, IScheduler? scheduler = null)

Parameters

source IObservable<IEnumerable<T>>

Source of enumerables.

scheduler IScheduler

Scheduler (optional).

Returns

IObservable<T>

A flattened observable.

Type Parameters

T

Element type.

FromArray<T>(IEnumerable<T>, IScheduler?)

Emits each element of an IEnumerable.

public static IObservable<T> FromArray<T>(this IEnumerable<T> source, IScheduler? scheduler = null)

Parameters

source IEnumerable<T>

Source enumerable.

scheduler IScheduler

Scheduler (optional).

Returns

IObservable<T>

Observable of elements.

Type Parameters

T

Element type.

GetMax<T>(IObservable<T?>, params IObservable<T?>[])

Gets the maximum from all sources.

public static IObservable<T?> GetMax<T>(this IObservable<T?> @this, params IObservable<T?>[] sources) where T : struct

Parameters

this IObservable<T?>

The first observable.

sources IObservable<T?>[]

Other sources.

Returns

IObservable<T?>

A sequence emitting the maximum of the latest values.

Type Parameters

T

The Value Type.

GetMin<T>(IObservable<T?>, params IObservable<T?>[])

Gets the minimum from all sources.

public static IObservable<T?> GetMin<T>(this IObservable<T?> @this, params IObservable<T?>[] sources) where T : struct

Parameters

this IObservable<T?>

The first observable.

sources IObservable<T?>[]

Other sources.

Returns

IObservable<T?>

A sequence emitting the minimum of the latest values.

Type Parameters

T

The Value Type.

Heartbeat<T>(IObservable<T>, TimeSpan, IScheduler)

Injects heartbeats in a stream when the source stream becomes quiet.

public static IObservable<IHeartbeat<T>> Heartbeat<T>(this IObservable<T> source, TimeSpan heartbeatPeriod, IScheduler scheduler)

Parameters

source IObservable<T>

Source stream.

heartbeatPeriod TimeSpan

Period between heartbeats.

scheduler IScheduler

Scheduler.

Returns

IObservable<IHeartbeat<T>>

Observable heartbeat values.

Type Parameters

T

Update type.

LatestOrDefault<T>(IObservable<T>, T)

Emit the latest value or a default if none exists.

public static IObservable<T> LatestOrDefault<T>(this IObservable<T> source, T defaultValue)

Parameters

source IObservable<T>

The source.

defaultValue T

The default value.

Returns

IObservable<T>

A sequence that emits the latest value or the default.

Type Parameters

T

The type of the source.

LogErrors<T>(IObservable<T>, Action<Exception>)

Logs the errors. Inline error logging without terminating the stream.

public static IObservable<T> LogErrors<T>(this IObservable<T> source, Action<Exception> logger)

Parameters

source IObservable<T>

The source.

logger Action<Exception>

The logger.

Returns

IObservable<T>

A sequence that logs errors.

Type Parameters

T

The type of the source.

Not(IObservable<bool>)

Emits the boolean negation of the source sequence.

public static IObservable<bool> Not(this IObservable<bool> source)

Parameters

source IObservable<bool>

Boolean source.

Returns

IObservable<bool>

Negated boolean sequence.

ObserveOnIf<T>(IObservable<T>, bool, IScheduler)

Conditionally switch schedulers.

public static IObservable<T> ObserveOnIf<T>(this IObservable<T> source, bool condition, IScheduler scheduler)

Parameters

source IObservable<T>

The source.

condition bool

if set to true [condition].

scheduler IScheduler

The scheduler.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

ObserveOnIf<T>(IObservable<T>, bool, IScheduler, IScheduler)

Conditionally switch schedulers.

public static IObservable<T> ObserveOnIf<T>(this IObservable<T> source, bool condition, IScheduler trueScheduler, IScheduler falseScheduler)

Parameters

source IObservable<T>

The source.

condition bool

if set to true [condition].

trueScheduler IScheduler

The true scheduler.

falseScheduler IScheduler

The false scheduler.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

ObserveOnIf<T>(IObservable<T>, IObservable<bool>, IScheduler)

Conditionally switch schedulers based on a reactive condition.

public static IObservable<T> ObserveOnIf<T>(this IObservable<T> source, IObservable<bool> condition, IScheduler scheduler)

Parameters

source IObservable<T>

The source.

condition IObservable<bool>

The reactive condition.

scheduler IScheduler

The scheduler to use when condition is true.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

ObserveOnIf<T>(IObservable<T>, IObservable<bool>, IScheduler, IScheduler)

Conditionally switch schedulers based on a reactive condition.

public static IObservable<T> ObserveOnIf<T>(this IObservable<T> source, IObservable<bool> condition, IScheduler trueScheduler, IScheduler falseScheduler)

Parameters

source IObservable<T>

The source.

condition IObservable<bool>

The reactive condition.

trueScheduler IScheduler

The scheduler to use when condition is true.

falseScheduler IScheduler

The scheduler to use when condition is false.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

ObserveOnSafe<TSource>(IObservable<TSource>, IScheduler?)

If the scheduler is not null observes on that scheduler.

public static IObservable<TSource> ObserveOnSafe<TSource>(this IObservable<TSource> source, IScheduler? scheduler)

Parameters

source IObservable<TSource>

Source sequence.

scheduler IScheduler

Scheduler to notify observers on (optional).

Returns

IObservable<TSource>

The source sequence whose callbacks happen on the specified scheduler.

Type Parameters

TSource

Element type.

OnErrorRetry<TSource>(IObservable<TSource?>)

Repeats the source until it terminates successfully (alias of Retry).

public static IObservable<TSource?> OnErrorRetry<TSource>(this IObservable<TSource?> source)

Parameters

source IObservable<TSource>

Source sequence.

Returns

IObservable<TSource>

Retried sequence.

Type Parameters

TSource

Element type.

OnErrorRetry<TSource, TException>(IObservable<TSource?>, Action<TException>)

When caught exception, do onError action and repeat observable sequence.

public static IObservable<TSource?> OnErrorRetry<TSource, TException>(this IObservable<TSource?> source, Action<TException> onError) where TException : Exception

Parameters

source IObservable<TSource>

The source.

onError Action<TException>

The on error.

Returns

IObservable<TSource>

A sequence that retries on error with optional delay.

Type Parameters

TSource

The type of the source.

TException

The type of the exception.

OnErrorRetry<TSource, TException>(IObservable<TSource?>, Action<TException>, int)

When caught exception, do onError action and repeat observable sequence during within retryCount.

public static IObservable<TSource?> OnErrorRetry<TSource, TException>(this IObservable<TSource?> source, Action<TException> onError, int retryCount) where TException : Exception

Parameters

source IObservable<TSource>

The source.

onError Action<TException>

The on error.

retryCount int

The retry count.

Returns

IObservable<TSource>

A sequence that retries on error with optional delay.

Type Parameters

TSource

The type of the source.

TException

The type of the exception.

OnErrorRetry<TSource, TException>(IObservable<TSource?>, Action<TException>, int, TimeSpan)

When caught exception, do onError action and repeat observable sequence after delay time during within retryCount.

public static IObservable<TSource?> OnErrorRetry<TSource, TException>(this IObservable<TSource?> source, Action<TException> onError, int retryCount, TimeSpan delay) where TException : Exception

Parameters

source IObservable<TSource>

The source.

onError Action<TException>

The on error.

retryCount int

The retry count.

delay TimeSpan

The delay.

Returns

IObservable<TSource>

A sequence that retries on error with optional delay.

Type Parameters

TSource

The type of the source.

TException

The type of the exception.

OnErrorRetry<TSource, TException>(IObservable<TSource?>, Action<TException>, int, TimeSpan, IScheduler)

When caught exception, do onError action and repeat observable sequence after delay time(work on delayScheduler) during within retryCount.

public static IObservable<TSource?> OnErrorRetry<TSource, TException>(this IObservable<TSource?> source, Action<TException> onError, int retryCount, TimeSpan delay, IScheduler delayScheduler) where TException : Exception

Parameters

source IObservable<TSource>

The source.

onError Action<TException>

The on error.

retryCount int

The retry count.

delay TimeSpan

The delay.

delayScheduler IScheduler

The delay scheduler.

Returns

IObservable<TSource>

A sequence that retries on error with optional delay.

Type Parameters

TSource

The type of the source.

TException

The type of the exception.

OnErrorRetry<TSource, TException>(IObservable<TSource?>, Action<TException>, TimeSpan)

When caught exception, do onError action and repeat observable sequence after delay time.

public static IObservable<TSource?> OnErrorRetry<TSource, TException>(this IObservable<TSource?> source, Action<TException> onError, TimeSpan delay) where TException : Exception

Parameters

source IObservable<TSource>

The source.

onError Action<TException>

The on error.

delay TimeSpan

The delay.

Returns

IObservable<TSource>

A sequence that retries on error with optional delay.

Type Parameters

TSource

The type of the source.

TException

The type of the exception.

OnNext<T>(IObserver<T?>, params T?[])

Pushes multiple values to an observer.

public static void OnNext<T>(this IObserver<T?> observer, params T?[] events)

Parameters

observer IObserver<T>

Observer to push to.

events T[]

Values to push.

Type Parameters

T

Type of value.

Pairwise<T>(IObservable<T>)

Emit (previous, current) pairs.

public static IObservable<(T Previous, T Current)> Pairwise<T>(this IObservable<T> source)

Parameters

source IObservable<T>

The source.

Returns

IObservable<(T Previous, T Current)>

An IObservable of (T Previous, T Current).

Type Parameters

T

The type.

Partition<T>(IObservable<T>, Func<T, bool>)

Partitions a sequence into two based on predicate.

public static (IObservable<T> True, IObservable<T> False) Partition<T>(this IObservable<T> source, Func<T, bool> predicate)

Parameters

source IObservable<T>

Source sequence.

predicate Func<T, bool>

Predicate.

Returns

(IObservable<T> True, IObservable<T> False)

Tuple of (trueSequence, falseSequence).

Type Parameters

T

Element type.

ReplayLastOnSubscribe<T>(IObservable<T>, T)

Always replay the last value, even if the source hasn’t produced one yet.

public static IObservable<T> ReplayLastOnSubscribe<T>(this IObservable<T> source, T initialValue)

Parameters

source IObservable<T>

The source.

initialValue T

The initial value.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

RetryForeverWithDelay<T>(IObservable<T>, TimeSpan)

Retries the forever with delay.

public static IObservable<T> RetryForeverWithDelay<T>(this IObservable<T> source, TimeSpan delay)

Parameters

source IObservable<T>

The source.

delay TimeSpan

The delay.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

RetryWithBackoff<T>(IObservable<T>, int, TimeSpan, double, TimeSpan?, IScheduler?)

Retries with exponential backoff.

public static IObservable<T> RetryWithBackoff<T>(this IObservable<T> source, int maxRetries, TimeSpan initialDelay, double backoffFactor = 2, TimeSpan? maxDelay = null, IScheduler? scheduler = null)

Parameters

source IObservable<T>

Source sequence.

maxRetries int

Maximum number of retries.

initialDelay TimeSpan

Initial backoff delay.

backoffFactor double

Multiplier for each retry (default 2).

maxDelay TimeSpan?

Optional maximum delay.

scheduler IScheduler

Scheduler (optional).

Returns

IObservable<T>

Retried sequence with backoff.

Type Parameters

T

Element type.

RetryWithDelay<T>(IObservable<T>, int, Func<int, TimeSpan>)

Retry with exponential.

public static IObservable<T> RetryWithDelay<T>(this IObservable<T> source, int retryCount, Func<int, TimeSpan> delaySelector)

Parameters

source IObservable<T>

The source.

retryCount int

The retry count.

delaySelector Func<int, TimeSpan>

The delay selector.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

RetryWithFixedDelay<T>(IObservable<T>, int, TimeSpan)

Retry with fixed backoff.

public static IObservable<T> RetryWithFixedDelay<T>(this IObservable<T> source, int retryCount, TimeSpan delay)

Parameters

source IObservable<T>

The source.

retryCount int

The retry count.

delay TimeSpan

The delay.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

SampleLatest<T>(IObservable<T>, IObservable<object>)

Sample the latest value whenever a trigger fires.

public static IObservable<T> SampleLatest<T>(this IObservable<T> source, IObservable<object> trigger)

Parameters

source IObservable<T>

The source.

trigger IObservable<object>

The trigger.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

ScanWithInitial<TSource, TAccumulate>(IObservable<TSource>, TAccumulate, Func<TAccumulate, TSource, TAccumulate>)

Scan that always emits the initial value first.

public static IObservable<TAccumulate> ScanWithInitial<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate initial, Func<TAccumulate, TSource, TAccumulate> accumulator)

Parameters

source IObservable<TSource>

The source.

initial TAccumulate

The initial.

accumulator Func<TAccumulate, TSource, TAccumulate>

The accumulator.

Returns

IObservable<TAccumulate>

An IObservable of TAccumulate.

Type Parameters

TSource

The type of the source.

TAccumulate

The type of the accumulate.

ScheduleSafe(IScheduler?, Action)

Schedules an action immediately if scheduler null, else on scheduler.

public static IDisposable ScheduleSafe(this IScheduler? scheduler, Action action)

Parameters

scheduler IScheduler

Scheduler.

action Action

Action.

Returns

IDisposable

Disposable for the scheduled action.

ScheduleSafe(IScheduler?, TimeSpan, Action)

Schedules an action after a due time.

public static IDisposable ScheduleSafe(this IScheduler? scheduler, TimeSpan dueTime, Action action)

Parameters

scheduler IScheduler

Scheduler.

dueTime TimeSpan

Delay.

action Action

Action.

Returns

IDisposable

Disposable for the scheduled action.

Schedule<T>(IObservable<T>, DateTimeOffset, IScheduler)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this IObservable<T> source, DateTimeOffset dueTime, IScheduler scheduler)

Parameters

source IObservable<T>

The value.

dueTime DateTimeOffset

The due time.

scheduler IScheduler

The scheduler.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(IObservable<T>, DateTimeOffset, IScheduler, Action<T>)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this IObservable<T> source, DateTimeOffset dueTime, IScheduler scheduler, Action<T> action)

Parameters

source IObservable<T>

The value.

dueTime DateTimeOffset

The due time.

scheduler IScheduler

The scheduler.

action Action<T>

The action.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(IObservable<T>, IScheduler, Func<T, T>)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this IObservable<T> source, IScheduler scheduler, Func<T, T> function)

Parameters

source IObservable<T>

The value.

scheduler IScheduler

The scheduler.

function Func<T, T>

The function.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(IObservable<T>, TimeSpan, IScheduler)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler)

Parameters

source IObservable<T>

The value.

dueTime TimeSpan

The due time.

scheduler IScheduler

The scheduler.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(IObservable<T>, TimeSpan, IScheduler, Action<T>)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler, Action<T> action)

Parameters

source IObservable<T>

The value.

dueTime TimeSpan

The due time.

scheduler IScheduler

The scheduler.

action Action<T>

The action.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(IObservable<T>, TimeSpan, IScheduler, Func<T, T>)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler, Func<T, T> function)

Parameters

source IObservable<T>

The value.

dueTime TimeSpan

The due time.

scheduler IScheduler

The scheduler.

function Func<T, T>

The function.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(T, DateTimeOffset, IScheduler)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this T value, DateTimeOffset dueTime, IScheduler scheduler)

Parameters

value T

The value.

dueTime DateTimeOffset

The due time.

scheduler IScheduler

The scheduler.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(T, DateTimeOffset, IScheduler, Action<T>)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this T value, DateTimeOffset dueTime, IScheduler scheduler, Action<T> action)

Parameters

value T

The value.

dueTime DateTimeOffset

The due time.

scheduler IScheduler

The scheduler.

action Action<T>

The action.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(T, IScheduler, Func<T, T>)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this T value, IScheduler scheduler, Func<T, T> function)

Parameters

value T

The value.

scheduler IScheduler

The scheduler.

function Func<T, T>

The function.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(T, TimeSpan, IScheduler)

Schedules a single value after a delay.

public static IObservable<T> Schedule<T>(this T value, TimeSpan dueTime, IScheduler scheduler)

Parameters

value T

Value.

dueTime TimeSpan

Delay.

scheduler IScheduler

Scheduler.

Returns

IObservable<T>

Observable that emits the value.

Type Parameters

T

Value type.

Schedule<T>(T, TimeSpan, IScheduler, Action<T>)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this T value, TimeSpan dueTime, IScheduler scheduler, Action<T> action)

Parameters

value T

The value.

dueTime TimeSpan

The due time.

scheduler IScheduler

The scheduler.

action Action<T>

The action.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Schedule<T>(T, TimeSpan, IScheduler, Func<T, T>)

Schedules the specified due time.

public static IObservable<T> Schedule<T>(this T value, TimeSpan dueTime, IScheduler scheduler, Func<T, T> function)

Parameters

value T

The value.

dueTime TimeSpan

The due time.

scheduler IScheduler

The scheduler.

function Func<T, T>

The function.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

SelectAsyncConcurrent<TSource, TResult>(IObservable<TSource>, Func<TSource, Task<TResult>>, int)

Projects each element to a task with limited concurrency.

public static IObservable<TResult> SelectAsyncConcurrent<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector, int maxConcurrency)

Parameters

source IObservable<TSource>

Source sequence.

selector Func<TSource, Task<TResult>>

Task selector.

maxConcurrency int

Max concurrency.

Returns

IObservable<TResult>

Merged sequence of task results.

Type Parameters

TSource

Source type.

TResult

Result type.

SelectAsyncSequential<TSource, TResult>(IObservable<TSource>, Func<TSource, Task<TResult>>)

Projects each element to a task executed sequentially.

public static IObservable<TResult> SelectAsyncSequential<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)

Parameters

source IObservable<TSource>

Source sequence.

selector Func<TSource, Task<TResult>>

Task selector.

Returns

IObservable<TResult>

Sequence of results preserving order.

Type Parameters

TSource

Source element type.

TResult

Result type.

SelectAsync<TSource, TResult>(IObservable<TSource>, Func<TSource, CancellationToken, Task<TResult>>)

Maps values to async operations without losing ordering or cancellation semantics.

public static IObservable<TResult> SelectAsync<TSource, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> asyncSelector)

Parameters

source IObservable<TSource>

The source.

asyncSelector Func<TSource, CancellationToken, Task<TResult>>

The asynchronous selector.

Returns

IObservable<TResult>

An IObservable of TResult.

Type Parameters

TSource

The type of the source.

TResult

The type of the result.

SelectAsync<TSource, TResult>(IObservable<TSource>, Func<TSource, Task<TResult>>)

Maps values to async operations without losing ordering or cancellation semantics.

public static IObservable<TResult> SelectAsync<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> asyncSelector)

Parameters

source IObservable<TSource>

The source.

asyncSelector Func<TSource, Task<TResult>>

The asynchronous selector.

Returns

IObservable<TResult>

An IObservable of TResult.

Type Parameters

TSource

The type of the source.

TResult

The type of the result.

SelectLatestAsync<TSource, TResult>(IObservable<TSource>, Func<TSource, Task<TResult>>)

Projects each element to a task but only latest result is emitted.

public static IObservable<TResult> SelectLatestAsync<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)

Parameters

source IObservable<TSource>

Source sequence.

selector Func<TSource, Task<TResult>>

Task selector.

Returns

IObservable<TResult>

Sequence of latest task results.

Type Parameters

TSource

Source type.

TResult

Result type.

Shuffle<T>(IObservable<T[]>)

Randomly shuffles arrays emitted by the source.

public static IObservable<T[]> Shuffle<T>(this IObservable<T[]> source)

Parameters

source IObservable<T[]>

Source array sequence.

Returns

IObservable<T[]>

Sequence of shuffled arrays (in-place).

Type Parameters

T

Array element type.

SkipWhileNull<T>(IObservable<T?>)

Skip null values until the first non-null appears.

public static IObservable<T> SkipWhileNull<T>(this IObservable<T?> source) where T : class

Parameters

source IObservable<T>

The source.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

Start(Action, IScheduler?)

Invokes the action asynchronously surfacing the result through a Unit observable.

public static IObservable<Unit> Start(Action action, IScheduler? scheduler)

Parameters

action Action

Action to run.

scheduler IScheduler

Scheduler (optional).

Returns

IObservable<Unit>

A sequence producing Unit upon completion.

Start<TResult>(Func<TResult>, IScheduler?)

Invokes the specified function asynchronously surfacing the result.

public static IObservable<TResult> Start<TResult>(Func<TResult> function, IScheduler? scheduler)

Parameters

function Func<TResult>

Function to run.

scheduler IScheduler

Scheduler.

Returns

IObservable<TResult>

A sequence producing the function result.

Type Parameters

TResult

Result type.

SubscribeAsync<T>(IObservable<T>, Func<T, Task>)

Subscribes allowing asynchronous operations to be executed without blocking the source.

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext)

Parameters

source IObservable<T>

Observable sequence to subscribe to.

onNext Func<T, Task>

Action to invoke for each element in the observable sequence.

Returns

IDisposable

IDisposable object used to unsubscribe from the observable sequence.

Type Parameters

T

The type of the elements in the source sequence.

SubscribeAsync<T>(IObservable<T>, Func<T, Task>, Action)

Subscribes allowing asynchronous operations to be executed without blocking the source.

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action onCompleted)

Parameters

source IObservable<T>

Observable sequence to subscribe to.

onNext Func<T, Task>

Action to invoke for each element in the observable sequence.

onCompleted Action

The on completed.

Returns

IDisposable

IDisposable object used to unsubscribe from the observable sequence.

Type Parameters

T

The type of the elements in the source sequence.

SubscribeAsync<T>(IObservable<T>, Func<T, Task>, Action<Exception>)

Subscribes allowing asynchronous operations to be executed without blocking the source.

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError)

Parameters

source IObservable<T>

Observable sequence to subscribe to.

onNext Func<T, Task>

Action to invoke for each element in the observable sequence.

onError Action<Exception>

The on error.

Returns

IDisposable

IDisposable object used to unsubscribe from the observable sequence.

Type Parameters

T

The type of the elements in the source sequence.

SubscribeAsync<T>(IObservable<T>, Func<T, Task>, Action<Exception>, Action)

Subscribes allowing asynchronous operations to be executed without blocking the source.

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)

Parameters

source IObservable<T>

Observable sequence to subscribe to.

onNext Func<T, Task>

Action to invoke for each element in the observable sequence.

onError Action<Exception>

The on error.

onCompleted Action

The on completed.

Returns

IDisposable

IDisposable object used to unsubscribe from the observable sequence.

Type Parameters

T

The type of the elements in the source sequence.

SubscribeSynchronous<T>(IObservable<T>, Func<T, Task>)

Subscribes an element handler to an observable sequence synchronously.

public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Func<T, Task> onNext)

Parameters

source IObservable<T>

Observable sequence to subscribe to.

onNext Func<T, Task>

Action to invoke for each element in the observable sequence.

Returns

IDisposable

IDisposable object used to unsubscribe from the observable sequence.

Type Parameters

T

The type of the elements in the source sequence.

SubscribeSynchronous<T>(IObservable<T>, Func<T, Task>, Action)

Subscribes an element handler and a completion handler to an observable sequence synchronously.

public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Func<T, Task> onNext, Action onCompleted)

Parameters

source IObservable<T>

Observable sequence to subscribe to.

onNext Func<T, Task>

Action to invoke for each element in the observable sequence.

onCompleted Action

Action to invoke upon graceful termination of the observable sequence.

Returns

IDisposable

IDisposable object used to unsubscribe from the observable sequence.

Type Parameters

T

The type of the elements in the source sequence.

Exceptions

ArgumentNullException

source or onNext or onCompleted is null.

SubscribeSynchronous<T>(IObservable<T>, Func<T, Task>, Action<Exception>)

Subscribes an element handler and an exception handler to an observable sequence synchronously.

public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError)

Parameters

source IObservable<T>

Observable sequence to subscribe to.

onNext Func<T, Task>

Action to invoke for each element in the observable sequence.

onError Action<Exception>

Action to invoke upon exceptional termination of the observable sequence.

Returns

IDisposable

IDisposable object used to unsubscribe from the observable sequence.

Type Parameters

T

The type of the elements in the source sequence.

SubscribeSynchronous<T>(IObservable<T>, Func<T, Task>, Action<Exception>, Action)

Subscribes to the specified source synchronously.

public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)

Parameters

source IObservable<T>

The source.

onNext Func<T, Task>

The on next.

onError Action<Exception>

The on error.

onCompleted Action

The on completed.

Returns

IDisposable

IDisposable object used to unsubscribe from the observable sequence.

Type Parameters

T

The type of the elements in the source sequence.

SwitchIfEmpty<T>(IObservable<T>, IObservable<T>)

Provide a fallback observable if the source completes without emitting.

public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> source, IObservable<T> fallback)

Parameters

source IObservable<T>

The source.

fallback IObservable<T>

The fallback.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

SyncTimer(TimeSpan)

Synchronized timer all instances of this with the same TimeSpan use the same timer.

public static IObservable<DateTime> SyncTimer(TimeSpan timeSpan)

Parameters

timeSpan TimeSpan

The time span.

Returns

IObservable<DateTime>

An observable sequence producing the shared DateTime ticks.

SynchronizeAsync<T>(IObservable<T>)

Synchronizes the asynchronous operations in downstream operations. Use SubscribeSynchronus instead for a simpler version. Call Sync.Dispose() to release the lock in the downstream methods.

public static IObservable<(T Value, IDisposable Sync)> SynchronizeAsync<T>(this IObservable<T> source)

Parameters

source IObservable<T>

The source.

Returns

IObservable<(T Value, IDisposable Sync)>

An Observable of T and a release mechanism.

Type Parameters

T

The type of the elements in the source sequence.

SynchronizeSynchronous<T>(IObservable<T>)

Wraps values with a synchronization disposable that completes when disposed.

public static IObservable<(T Value, IDisposable Sync)> SynchronizeSynchronous<T>(this IObservable<T> source)

Parameters

source IObservable<T>

Source sequence.

Returns

IObservable<(T Value, IDisposable Sync)>

Sequence of (value, sync handle).

Type Parameters

T

Element type.

TakeUntil<TSource>(IObservable<TSource>, Func<TSource, bool>)

Takes elements until predicate returns true for an element (inclusive) then completes.

public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)

Parameters

source IObservable<TSource>

Source sequence.

predicate Func<TSource, bool>

Predicate for completion.

Returns

IObservable<TSource>

Sequence that completes when predicate satisfied.

Type Parameters

TSource

Element type.

ThrottleDistinct<T>(IObservable<T>, TimeSpan)

Throttle but only emit when the value actually changes.

public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan throttle)

Parameters

source IObservable<T>

The source.

throttle TimeSpan

The throttle.

Returns

IObservable<T>

A throttled distinct sequence.

Type Parameters

T

Element type.

ThrottleDistinct<T>(IObservable<T>, TimeSpan, IScheduler)

Throttle but only emit when the value actually changes.

public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan throttle, IScheduler scheduler)

Parameters

source IObservable<T>

The source.

throttle TimeSpan

The throttle.

scheduler IScheduler

The scheduler.

Returns

IObservable<T>

A throttled distinct sequence.

Type Parameters

T

Element type.

ThrottleFirst<T>(IObservable<T>, TimeSpan, IScheduler?)

Emits only the first value in each time window.

public static IObservable<T> ThrottleFirst<T>(this IObservable<T> source, TimeSpan window, IScheduler? scheduler = null)

Parameters

source IObservable<T>

Source sequence.

window TimeSpan

Time window.

scheduler IScheduler

Scheduler (optional).

Returns

IObservable<T>

Throttle-first sequence.

Type Parameters

T

Element type.

ThrottleOnScheduler<T>(IObservable<T>, TimeSpan, IScheduler)

Throttles the on scheduler.

public static IObservable<T> ThrottleOnScheduler<T>(this IObservable<T> source, TimeSpan timeSpan, IScheduler scheduler)

Parameters

source IObservable<T>

The source.

timeSpan TimeSpan

The time span.

scheduler IScheduler

The scheduler.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

ThrottleUntilTrue<T>(IObservable<T>, TimeSpan, Func<T, bool>)

Throttle until a predicate becomes true.

public static IObservable<T> ThrottleUntilTrue<T>(this IObservable<T> source, TimeSpan throttle, Func<T, bool> predicate)

Parameters

source IObservable<T>

The source.

throttle TimeSpan

The throttle.

predicate Func<T, bool>

The predicate.

Returns

IObservable<T>

An IObservable of T.

Type Parameters

T

The type.

ToHotTask<T>(IObservable<T>)

Convert an observable to a Task that starts immediately.

public static Task<T> ToHotTask<T>(this IObservable<T> source)

Parameters

source IObservable<T>

The source.

Returns

Task<T>

A Task of T.

Type Parameters

T

The type.

ToPropertyObservable<T, TProperty>(T, Expression<Func<T, TProperty>>)

Convert a property getter into an observable that emits on change.

public static IObservable<TProperty> ToPropertyObservable<T, TProperty>(this T source, Expression<Func<T, TProperty>> propertyExpression) where T : INotifyPropertyChanged

Parameters

source T

The source.

propertyExpression Expression<Func<T, TProperty>>

The property expression.

Returns

IObservable<TProperty>

An IObservable of TProperty.

Type Parameters

T

The type of the source.

TProperty

The type of the property.

Exceptions

ArgumentException

Expression must be a property.

ToReadOnlyBehavior<T>(T)

A safe wrapper around BehaviorSubject that exposes only the observable.

public static (IObservable<T> Observable, IObserver<T> Observer) ToReadOnlyBehavior<T>(T initialValue)

Parameters

initialValue T

The initial value.

Returns

(IObservable<T> Observable, IObserver<T> Observer)

A tuple of IObservable and IObserver.

Type Parameters

T

The type.

Using<T>(T, Action<T>, IScheduler?)

Using helper with Action.

public static IObservable<Unit> Using<T>(this T obj, Action<T> action, IScheduler? scheduler = null) where T : IDisposable

Parameters

obj T

Object to use.

action Action<T>

Action to run.

scheduler IScheduler

Scheduler.

Returns

IObservable<Unit>

Completion signal.

Type Parameters

T

Disposable type.

Using<T, TResult>(T, Func<T, TResult>, IScheduler?)

Using helper with Func.

public static IObservable<TResult> Using<T, TResult>(this T obj, Func<T, TResult> function, IScheduler? scheduler = null) where T : IDisposable

Parameters

obj T

Object to use.

function Func<T, TResult>

Function to invoke.

scheduler IScheduler

Scheduler.

Returns

IObservable<TResult>

Observable of result.

Type Parameters

T

Disposable type.

TResult

Result type.

WaitUntil<T>(IObservable<T>, Func<T, bool>)

Emits the first element matching predicate then completes.

public static IObservable<T> WaitUntil<T>(this IObservable<T> source, Func<T, bool> predicate)

Parameters

source IObservable<T>

Source sequence.

predicate Func<T, bool>

Predicate.

Returns

IObservable<T>

Sequence with first matching element.

Type Parameters

T

Element type.

WhereFalse(IObservable<bool>)

Filters to false values only.

public static IObservable<bool> WhereFalse(this IObservable<bool> source)

Parameters

source IObservable<bool>

Boolean source.

Returns

IObservable<bool>

Sequence of false values.

WhereIsNotNull<T>(IObservable<T>)

Returns only values that are not null. Converts the nullability.

public static IObservable<T> WhereIsNotNull<T>(this IObservable<T> observable)

Parameters

observable IObservable<T>

The observable that can contain nulls.

Returns

IObservable<T>

A non nullable version of the observable that only emits valid values.

Type Parameters

T

The type of value emitted by the observable.

WhereTrue(IObservable<bool>)

Filters to true values only.

public static IObservable<bool> WhereTrue(this IObservable<bool> source)

Parameters

source IObservable<bool>

Boolean source.

Returns

IObservable<bool>

Sequence of true values.

While(Func<bool>, Action, IScheduler?)

While construct.

public static IObservable<Unit> While(Func<bool> condition, Action action, IScheduler? scheduler = null)

Parameters

condition Func<bool>

Condition to evaluate.

action Action

Action to execute.

scheduler IScheduler

Scheduler.

Returns

IObservable<Unit>

Observable representing the loop.

WithLimitedConcurrency<T>(IEnumerable<Task<T>>, int)

Executes with limited concurrency.

public static IObservable<T> WithLimitedConcurrency<T>(this IEnumerable<Task<T>> taskFunctions, int maxConcurrency)

Parameters

taskFunctions IEnumerable<Task<T>>

Tasks to execute.

maxConcurrency int

Maximum concurrency.

Returns

IObservable<T>

A sequence of task results.

Type Parameters

T

The result type.