This tree can help you find the Observable operator you’re looking for.
- I want to create a new Observable
- that emits a particular item
just( )
- that was returned from a function called at subscribe-time
start( )
- anew for each subscriber
toAsync( )
- that was returned from an
called at subscribe-time fromAction( )
- that was returned from a
called at subscribe-time fromCallable( )
- that was returned from a
called at subscribe-time fromRunnable( )
- after a specified delay
timer( )
- that emits a particular set of 1–10 items
from( )
- that obtains its sequence from an Array or Iterable
from( )
- by retrieving it from a Future
deferFuture( )
- that obtains its sequence from a Future
from( )
- with a timeout
from( )
- that obtains its sequence from an
called periodically runAsync( )
- that emits a sequence of items repeatedly
repeat( )
- as long as a predicate remains true
whileDo( )
- but at least once, no matter what
doWhile( )
- from scratch, with custom logic
create( )
- for each observer that subscribes
defer( )
- that emits a sequence of integers
range( )
- at particular intervals of time
interval( )
- after a specified delay
timer( )
- that completes without emitting items
empty( )
- that does nothing at all
never( )
- I want to create an Observable by combining other Observables
- and emitting all of the items from all of the Observables in whatever order they are received
- where the source Observables are passed to the operator as parameters
- where the source Observables are found in an Array
- where the source Observables are found in an Iterable or Observable
- but I only want to process a certain number of them at once
- but not forwarding any error notifications until all source Observables have terminated
mergeDelayError( )
- and emitting all of the items from all of the Observables, one Observable at a time
concat( )
- by combining the items from two or more Observables sequentially to come up with new items to emit
- whenever each of the Observables has emitted a new item
zip( )
- whenever any of the Observables has emitted a new item
combineLatest( )
- whenever an item is emitted by one Observable in a window defined by an item emitted by another
join( )
- based on an Observable that emits all items that have fallen in such a window
groupJoin( )
- by means of
intermediaries and/then/when
- and emitting the items from only the most-recently emitted of those Observables
switchOnNext( )
- and mirroring only one of those Observables (which one depends on a parameter I am passed)
switchCase( )
- I want emit the items from an Observable after transforming them
- one at a time with a function
map( )
- by casting them to a particular type
cast( )
- by emitting all of the items emitted by corresponding Observables
flatMap( )
- combined with the original items by means of a function
- by emitting all of the items in corresponding Iterables
- combined with the original items by means of a function
- based on all of the items that preceded them
scan( )
- by combining them sequentially with the items in an Iterable by means of a function
- by attaching a timestamp to them
timestamp( )
- into an indicator of the amount of time that lapsed before the emission of the item
timeInterval( )
- I want to shift the items emitted by an Observable forward in time before reemitting them
- with the amount of the shift calculated on a per-item basis
- and the initial subscription to the Observable shifted as well
- I want to transform items and notifications from an Observable into items and reemit them
- by emitting all of the items emitted by corresponding Observables
mergeMap( )
- by wrapping them in
objects materialize( )
- which I can then unwrap again with
dematerialize( )
- I want to ignore all items emitted by an Observable and only pass along its completed/error notification
ignoreElements( )
- I want to mirror an Observable but prefix items to its sequence
- obtained from an Array or Iterable
- obtained from an Observable
- passed as parameters to the operator
- only if its sequence is empty
- I want to collect items from an Observable and reemit them as buffers of items
- with a maximum number of items per buffer
- and starting every n items
- each time a second Observable emits an item
- with buffers given an initial capacity for efficiency reasons
- where that second Observable is returned from a function I supply
- and operates on the emission of a third Observable that opens the buffer
- at periodic intervals
- or when a certain maximum number of items fill the buffer
- for a certain period of time after the interval begins
- containing only the last items emitted
- that is, the last n items
- emitted during a window of time before the Observable completed
- during a window of time before the Observable completed
- I want to split one Observable into multiple Observables
- with a maximum number of items per sub-Observable
- and starting every n items
- each time a second Observable emits an item
- where that second Observable is returned from a function I supply
- and operates on the emission of a third Observable that starts the sub-Observable
- at periodic intervals
- or when a certain maximum number of items have been emitted on the sub-Observable
- for a certain period of time after the interval begins
- so that similar items end up on the same Observable
groupBy( )
- but periodically completing some of those Observables even if the source is not complete
- and transforming the items before emitting them on those Observables
- I want to retrieve from an Observable
- the last item emitted before it completed
last( )
- or a default item if none were emitted
lastOrDefault( )
- that matches a predicate
- or a default item if none did
- the sole item it emitted
- or an exception if it did not emit exactly one
single( )
- or rather a default item if it did not emit any
singleOrDefault( )
- that matches a predicate, or an exception if exactly one did not
- or rather a default item if none did
- the first item it emitted
first( )
- or a default item if none were emitted
firstOrDefault( )
- that matches a predicate
- or a default item if none did
- I want to reemit only certain items from an Observable
- by filtering out those that do not match some predicate
filter( )
- by filtering out those that are not of a particular type
ofType( )
- that is, only the first item
- or notify of an error if the source is empty
first( )
- or a default value if the source is empty
- that matches a predicate
- or notify of an error if none do
- or a default value if none do
- that is, only the first items
- that is, the first n items
- that is, items emitted by the source during an initial period of time
- that is, only the last item
last( )
- that meets some predicate
- or a default item if none do
- or a default item the source emits nothing
lastOrDefault( )
- that is, only item n
elementAt( )
- or a default value if there is no item n
elementAtOrDefault( )
- that is, only those items after the first items
- that is, after the first n items
- or until one of those items matches a predicate
skipWhileWithIndex( )
- that is, until one of those items matches a predicate
skipWhile( )
- that is, after an initial period of time
- that is, after a second Observable emits an item
skipUntil( )
- that is, those items except the last items
- that is, except the last n items
skipLast( )
- or until one of those items matches a predicate
takeWhileWithIndex( )
- that is, until one of those items matches a predicate
takeWhile( )
- that is, except items emitted during a period of time before the source completes
- that is, except items emitted after a second Observable emits an item
takeUntil( )
- by sampling the Observable periodically
- based on a timer
- and emitting the most-recently emitted item in the period
- and emitting the first-emitted item in the period
- based on emissions from another Observable
- and emitting the most-recently emitted item in the period
- and emitting the first-emitted item in the period
- by only emitting items that are not followed by other items within some duration
- based on a timer
- based on emissions from another Observable
- by suppressing items that are duplicates of already-emitted items
distinct( )
- according to a particular function
- if they immediately follow the item they are duplicates of
distinctUntilChanged( )
- according to a particular function
- by delaying my subscription to it for some time after it begins emitting items
- I want to reemit items from an Observable only on condition
- I want to evaluate the entire sequence of items emitted by an Observable
- and emit a single boolean indicating if all of the items pass some test
all( )
- and emit a single boolean indicating if any of the items pass some test
contains( )
- and emit a single boolean indicating if the Observable emitted any items
exists( )
- and emit a single boolean indicating if the Observable emitted no items
isEmpty( )
- and emit a single boolean indicating if the sequence is identical to one emitted by a second Observable
sequenceEqual( )
- and emit the average of all of their values
Type( )
- and emit the sum of all of their values
Type( )
- and emit a number indicating how many items were in the sequence
- [
]count( )
- and emit the item with the maximum value
max( )
- according to some value-calculating function
maxBy( )
- and emit the item with the minimum value
min( )
- according to some value-calculating function
minBy( )
- by applying an aggregation function to each item in turn and emitting the result
reduce( )
- in the form of a single mutable data structure
collect( )
- by applying a function to each item in the sequence, blocking until complete
forEach( )
- I want to convert the entire sequence of items emitted by an Observable
- into a Future
toFuture( )
- into an Iterable
toIterable( )
- that returns the most recently item emitted by the Observable
mostRecent( )
- only if it has not previously returned that item
latest( )
- that returns the next item when it is emitted by the Observable
latest( )
- into an Iterator
getIterator( )
ortoIterator( )
- into a List
toList( )
- sorted by some criterion
toSortedList( )
- into a Map
toMap( )
- that is also an ArrayList
toMultiMap( )
- I want an Observable to emit exactly one item
- so I want it to notify of an error otherwise
single( )
- so I want it to notify of an error if it emits more than one, or a default item if it emits none
singleOrDefault( )
- that matches a predicate
- so I want it to notify of an error otherwise
- so I want it to notify of an error if it emits more than one, or a default item if it emits none
- I want an operator to operate on a particular Scheduler
subscribeOn( )
- doing its processing in parallel on multiple threads without making the resulting Observable poorly-behaved
parallel( )
- when it notifies Observers
observeOn( )
- I want an Observable to invoke a particular action
- whenever it emits an item
- when it issues a completed notification
- when it issues an error notification
- when it issues a completed or error notification
- after it has issued a completed or error notification
- whenever it emits an item or issues a completed/error notification
- I want an Observable that will notify observers of an error
error( )
- if a specified period of time elapses without it emitting an item
- I want an Observable to recover gracefully
- from a timeout by switching to a backup Observable
- from an upstream error notification
- by switching to a particular backup Observable
- but only if the error is an
onExceptionResumeNext( )
- but only if the error is an
- by switching to a backup Observable returned from a function that is passed the error
- by emitting a particular item and completing normally
onErrorReturn( )
- by attempting to resubscribe to the upstream Observable
retry( )
- a certain number of times
- so long as a predicate remains true
- from being potentially unserialized or otherwise poorly-behaved
serialize( )
- I want to create a resource that has the same lifespan as the Observable
using( )
- I want to subscribe to an Observable and receive a
that blocks until the Observable completes forEachFuture( )
- I want an Observable that does not start emitting items to subscribers until asked
publish( )
ormulticast( )
- and then only emits the last item in its sequence
publishLast( )
- and then emits the complete sequence, even to those who subscribe after the sequence has begun
replay( )
- but I want it to go away once all of its subscribers unsubscribe
refCount( )
orshare( )
- and then I want to ask it to start
connect( )
- I want an Observable to retransmit items to observers who subscribe late
cache( )
ⓐ: this operator is part of the optional async-util
ⓑ: this operator is part of the BlockingObservable
ⓒ: this operator is part of the optional computation-expressions
ⓜ: this operator is part of the optional math
Ⓢ: a variant of this operator allows you to choose a particular Scheduler
I have omitted parameter names from some methods where they are not necessary to distinguish variants of the method. This page was inspired by the RxJS tables (static and instance) created by Paul Taylor.