Skip to content

Latest commit

 

History

History
3332 lines (2478 loc) · 180 KB

CHANGES.md

File metadata and controls

3332 lines (2478 loc) · 180 KB

RxJava Releases

Version 1.0.8 – March 7th 2015 (Maven Central)

  • [Pull 2809] (ReactiveX#2809) Fixed takeUntil not unsubscribing from either of the observables in case of a terminal condition.
  • [Pull 2804] (ReactiveX#2804) ObserveOn throughput enhancements
  • [Pull 2767] (ReactiveX#2767) Optimized scalar observeOn/subscribeOn
  • [Pull 2776] (ReactiveX#2776) Experimental: add new operator onBackpressureDrop(Action1 onDrop)
  • [Pull 2788] (ReactiveX#2788) Fix the bug that 'publish' will cache items when no subscriber
  • [Pull 2779] (ReactiveX#2779) OperatorMulticast.connect(connection) should not return null
  • [Pull 2771] (ReactiveX#2771) OnSubscribeRange request overflow check
  • [Pull 2770] (ReactiveX#2770) OperatorOnBackpressureDrop request overflow check
  • [Pull 2769] (ReactiveX#2769) OperatorCombineLatest request overflow check

Version 1.0.7 – February 21st 2015 (Maven Central)

This release includes some bug fixes along with a new operator and performance enhancements.

Experimental Operator

Note that these APIs may still change or be removed altogether since they are marked as @Experimental.

withLatestFrom(Observable, Selector)

This allows combining all values from one Observable with the latest value from a second Observable at each onNext.

For example:

Observable<Long> a = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> b = Observable.interval(250, TimeUnit.MILLISECONDS);


a.withLatestFrom(b, (x, y) -> new long[] { x, y })
        .toBlocking()
        .forEach(pair -> System.out.println("a: " + pair[0] + " b: " + pair[1]));

This outputs:

a: 0 b: 3
a: 1 b: 6
a: 2 b: 11
a: 3 b: 15
a: 4 b: 19
a: 5 b: 23
a: 6 b: 27

Changes

ReactiveX#2760 Operator: WithLatestFrom ReactiveX#2762 Optimized isUnsubscribed check ReactiveX#2759 Observable.using should use unsafeSubscribe and enable eager disposal ReactiveX#2655 SwitchOnNext: fix upstream producer replacing the ops own producer

Version 1.0.6 – February 11th 2015 (Maven Central)

This release adds an experimental operator and fixes several bugs.

flatMap(maxConcurrent)

Note that this API may still change or be removed altogether since it is marked as @Beta.

A flatMap overload was added that allows limiting concurrency, or the number of Observables being merged .

This now means that these are the same, one using merge directly, the other using flatMap and passing in 10 as the maxConcurrent:

Observable<Observable<Integer>> asyncWork = range(1, 1000000)
        .doOnNext(i -> System.out.println("Emitted Value: " + i))
        .map(item -> {
            return just(item)
                    .doOnNext(MergeMaxConcurrent::sleep)
                    .subscribeOn(Schedulers.io());
        });
merge(asyncWork, 10).toBlocking().forEach(v -> System.out.println("Received: " + v));
range(1, 1000000)
        .doOnNext(i -> System.out.println("Emitted Value: " + i))
        .flatMap(item -> {
            return just(item)
                    .doOnNext(MergeMaxConcurrent::sleep)
                    .subscribeOn(Schedulers.io());
        }, 10)
        .toBlocking().forEach(v -> System.out.println("Received: " + v));

Changes

  • [Pull 2627] (ReactiveX#2627) FlatMap overloads with maximum concurrency parameter
  • [Pull 2648] (ReactiveX#2648) TakeWhile: don't unsubscribe downstream.
  • [Pull 2580] (ReactiveX#2580) Allow configuring the maximum number of computation scheduler threads
  • [Pull 2601] (ReactiveX#2601) Added common Exceptions.throwIfAny to throw a collection of exceptions
  • [Pull 2644] (ReactiveX#2644) Missing Unsafe class yields NoClassDefFoundError
  • [Pull 2642] (ReactiveX#2642) Fix a potential memory leak in schedulePeriodically
  • [Pull 2630] (ReactiveX#2630) Cast back Observer to Subscriber if passed to subscribe(Observer)
  • [Pull 2622] (ReactiveX#2622) Changed Observable.empty() into a stateless constant observable.
  • [Pull 2607] (ReactiveX#2607) OnSubscribeRefCount - improve comments

Version 1.0.5 – February 3rd 2015 (Maven Central)

This release includes many bug fixes along with a few new operators and enhancements.

Experimental Operators

This release adds a few experimental operators.

Note that these APIs may still change or be removed altogether since they are marked as @Experimental.

takeUntil(predicate)

This operator allows conditionally unsubscribing an Observable but inclusively emitting the final onNext. This differs from takeWhile which excludes the final onNext.

// takeUntil(predicate) example
Observable.just(1, 2, 3, 4, 5, 6, 7)
        .doOnEach(System.out::println)
        .takeUntil(i -> i == 3)
        .forEach(System.out::println);

// takeWhile(predicate) example
Observable.just(1, 2, 3, 4, 5, 6, 7)
        .doOnEach(System.out::println)
        .takeWhile(i -> i <= 3)
        .forEach(System.out::println);

This outputs:

// takeUntil(predicate)
[rx.Notification@30e84925 OnNext 1]
1
[rx.Notification@30e84926 OnNext 2]
2
[rx.Notification@30e84927 OnNext 3]
3

// takeWhile(predicate)
[rx.Notification@30e84925 OnNext 1]
1
[rx.Notification@30e84926 OnNext 2]
2
[rx.Notification@30e84927 OnNext 3]
3
[rx.Notification@30e84928 OnNext 4]

Note how takeWhile produces 4 values and takeUntil produces 3.

switchIfEmpty

The new switchIfEmpty operator is a companion to defaultIfEmpty that switches to a different Observable if the primary Observable is empty.

Observable.empty()
        .switchIfEmpty(Observable.just(1, 2, 3))
        .forEach(System.out::println);

Enhancements

merge(maxConcurrent) with backpressure

This release adds backpressure to merge(maxConcurrent) so that horizontal buffer bloat can also be controll with the maxConcurrent parameter.

This allows parallel execution such as the following to work with backpressure:

public class MergeMaxConcurrent {

    public static void main(String[] args) {
        // define 1,000,000 async tasks
        Observable<Observable<Integer>> asyncWork = range(1, 1000000)
                      .doOnNext(i -> System.out.println("Value: " + i))
                .doOnRequest(r -> System.out.println("request1 " + r))
                .map(item -> {
                    return just(item)
                            // simulate slow IO or computation
                            .doOnNext(MergeMaxConcurrent::sleep)
                            .subscribeOn(Schedulers.io());
                })
                .doOnRequest(r -> System.out.println("request2 " + r));
                
        // allow 10 outstanding tasks at a time
        merge(asyncWork, 10).toBlocking().forEach(System.out::println);
    }

    public static void sleep(int value) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

In prior versions all 1,000,000 tasks are immediately emitted and queued for execution. As of this release it correctly allows 10 at a time to be emitted.

Changes

  • [Pull 2493] (ReactiveX#2493) Experimental Operator TakeUntil with predicate
  • [Pull 2585] (ReactiveX#2585) Experimental Operator: switchIfEmpty
  • [Pull 2470] (ReactiveX#2470) Experimental Subject state information methods & bounded ReplaySubject termination
  • [Pull 2540] (ReactiveX#2540) Merge with max concurrency now supports backpressure.
  • [Pull 2332] (ReactiveX#2332) Operator retry test fix attempt
  • [Pull 2244] (ReactiveX#2244) OperatorTakeLast add check for isUnsubscribed to fast path
  • [Pull 2469] (ReactiveX#2469) Remove the execute permission from source files
  • [Pull 2455] (ReactiveX#2455) Fix for #2191 - OperatorMulticast fails to unsubscribe from source
  • [Pull 2474] (ReactiveX#2474) MergeTest.testConcurrency timeout to let other tests run
  • [Pull 2335] (ReactiveX#2335) A set of stateless operators that don't need to be instantiated
  • [Pull 2447] (ReactiveX#2447) Fail early if a null subscription is added to a CompositeSubscription.
  • [Pull 2475] (ReactiveX#2475) SynchronousQueue.clone fix
  • [Pull 2477] (ReactiveX#2477) Backpressure tests fix0121
  • [Pull 2476] (ReactiveX#2476) Fixed off-by-one error and value-drop in the window operator.
  • [Pull 2478] (ReactiveX#2478) RefCountAsync: adjusted time values as 1 ms is unreliable
  • [Pull 2238] (ReactiveX#2238) Fix the bug that cache doesn't unsubscribe the source Observable when the source is terminated
  • [Pull 1840] (ReactiveX#1840) Unsubscribe when thread is interrupted
  • [Pull 2471] (ReactiveX#2471) Fixes NPEs reported in ReactiveX#1702 by synchronizing queue.
  • [Pull 2482] (ReactiveX#2482) Merge: fixed hangs & missed scalar emissions
  • [Pull 2547] (ReactiveX#2547) Warnings cleanup
  • [Pull 2465] (ReactiveX#2465) ScheduledExecutorService: call purge periodically on JDK 6 to avoid cancelled task-retention
  • [Pull 2591] (ReactiveX#2591) Changed the naming of the NewThreadWorker's system parameters
  • [Pull 2543] (ReactiveX#2543) OperatorMerge handle request overflow
  • [Pull 2548] (ReactiveX#2548) Subscriber.request should throw exception if negative request made
  • [Pull 2550] (ReactiveX#2550) Subscriber.onStart requests should be additive (and check for overflow)
  • [Pull 2553] (ReactiveX#2553) RxRingBuffer with synchronization
  • [Pull 2565] (ReactiveX#2565) Obstruction detection in tests.
  • [Pull 2563] (ReactiveX#2563) Retry backpressure test: split error conditions into separate test lines.
  • [Pull 2572] (ReactiveX#2572) Give more time to certain concurrency tests.
  • [Pull 2559] (ReactiveX#2559) OnSubscribeFromIterable - add request overflow check
  • [Pull 2574] (ReactiveX#2574) SizeEviction test needs to return false
  • [Pull 2561] (ReactiveX#2561) Updating queue code from JCTools
  • [Pull 2566] (ReactiveX#2566) CombineLatest: fixed concurrent requestUpTo yielding -1 requests
  • [Pull 2552] (ReactiveX#2552) Publish: fixed incorrect subscriber requested accounting
  • [Pull 2583] (ReactiveX#2583) Added perf tests for various container-like subscriptions
  • [Pull 1955] (ReactiveX#1955) OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs
  • [Pull 2590] (ReactiveX#2590) Zip: fixed unbounded downstream requesting above Long.MAX_VALUE
  • [Pull 2589] (ReactiveX#2589) Repeat/retry: fixed unbounded downstream requesting above Long.MAX_VALUE
  • [Pull 2567] (ReactiveX#2567) RefCount: disconnect all if upstream terminates
  • [Pull 2593] (ReactiveX#2593) Zip: emit onCompleted without waiting for request + avoid re-reading fields

Version 1.0.4 – December 29th 2014 (Maven Central)

  • [Pull 2156] (ReactiveX#2156) Fix the issue that map may swallow fatal exceptions
  • [Pull 1967] (ReactiveX#1967) Fix the issue that GroupBy may not call 'unsubscribe'
  • [Pull 2052] (ReactiveX#2052) OperatorDoOnRequest.ParentSubscriber should be static class
  • [Pull 2237] (ReactiveX#2237) Make Publish Operator Release RingBuffer
  • [Pull 2053] (ReactiveX#2053) Fixed wrong bounded ReplaySubject use in test

Version 1.0.3 – December 15th 2014 (Maven Central)

  • [Pull 1928] (ReactiveX#1928) Experimental: Add onBackpressureBuffer with capacity

  • [Pull 1946] (ReactiveX#1946) Experimental: AbstractOnSubscribe to help build Observables one onNext at a time.

  • [Pull 1960] (ReactiveX#1960) Beta: doOnRequest

  • [Pull 1965] (ReactiveX#1965) Fix the issue that Sample doesn't call 'unsubscribe'

  • [Pull 1966] (ReactiveX#1966) Fix NPE when the key is null in GroupBy

  • [Pull 1964] (ReactiveX#1964) Handle 0 or negative request in Buffer

  • [Pull 1957] (ReactiveX#1957) Fix 'request(0)' issue in Scan

  • [Pull 1950] (ReactiveX#1950) Add "Subscriptions.unsubscribed" to fix the 'isUnsubscribed' issue

  • [Pull 1938] (ReactiveX#1938) Any/All should not unsubscribe downstream.

  • [Pull 1968] (ReactiveX#1968) Upgrade to Gradle 2.2

  • [Pull 1961] (ReactiveX#1961) Remove Request Batching in Merge

  • [Pull 1953] (ReactiveX#1953) Fixed timer cast-to-int crash causing incorrect benchmark.

  • [Pull 1952] (ReactiveX#1952) Remove ActionSubscription

  • [Pull 1951] (ReactiveX#1951) Remove extraneous request(n) and onCompleted() calls when unsubscribed.

  • [Pull 1947] (ReactiveX#1947) Fixed first emission racing with pre and post subscription.

  • [Pull 1937] (ReactiveX#1937) Scheduler.Worker to be finally unsubscribed to avoid interference

  • [Pull 1926] (ReactiveX#1926) Move the codes out of the finally block

  • [Pull 1922] (ReactiveX#1922) Set removeOnCancelPolicy on the threadpool if supported

Version 1.0.2 – December 1st 2014 (Maven Central)

This release adds @Beta and @Experimental annotations to mark APIs that are not yet stable.

An example of how this looks in the Javadocs is:

@Experimental
public final Observable<T> onBackpressureBlock()

The lifecycle and stability of these are documented in the README as follows:

@Beta

APIs marked with the @Beta annotation at the class or method level are subject to change. They can be modified in any way, or even removed, at any time. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your own control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).

@Experimental

APIs marked with the @Experimental annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.

  • [Pull 1905] (ReactiveX#1905) Beta & Experimental Annotations
  • [Pull 1907] (ReactiveX#1907) Experimental: onBackpressureBlock
  • [Pull 1903] (ReactiveX#1903) Fix TestScheduler Handling of Immediate vs Virtual Time
  • [Pull 1898] (ReactiveX#1898) Scheduled action no interrupt
  • [Pull 1904] (ReactiveX#1904) Fix the bug that Scan may request 0 when n is 1
  • [Pull 1912] (ReactiveX#1912) Fixed retry without backpressure & test function to support bp

Version 1.0.1 – November 28th 2014 (Maven Central)

Version 1.0.0 – November 18th 2014 (Maven Central)

After 2+ years of internal and open source development, 3600+ commits, 100+ releases, and with the help of 97 contributors RxJava has hit version 1.0.0.

Thank you @headinthebox @zsxwing @samuelgruetter @akarnokd @quidryan @DavidMGross @abersnaze @jmhofer @mairbek @mttkay @daveray @mattrjacobs @michaeldejong @MarioAriasC @johngmyers @pron @jbripley @davidmoten @gliptak @johnhmarks @jloisel @billyy @prabirshrestha @ragalie @abliss @dpsm @daschl @thegeez and the many other contributors and those who have reported bugs, tweeted, blogged or presented about RxJava.

The quality of this release could not have been achieved without all of your help. Thank you for your involvement and for building a community around this project.

JVM Language Adaptors & Subprojects

As of 1.0 the JVM language adapters and other subprojects no longer live under RxJava but have been separated out into their own projects with their own release cycles:

Versioning

Version 1.x is now a stable API and will be supported for several years.

Minor 1.x increments (such as 1.1, 1.2, etc) will occur when non-trivial new functionality is added or significant enhancements or bug fixes occur that may have behavioral changes that may affect some edge cases (such as dependence on behavior resulting from a bug). An example of an enhancement that would classify as this is adding reactive pull backpressure support to an operator that previously did not support it. This should be backwards compatible but does behave differently.

Patch 1.x.y increments (such as 1.0.0 -> 1.0.1, 1.3.1 -> 1.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload).

Roadmap and Known Issues

  • 1.0.x milestone with known issues
  • 1.1 milestone with additional support for reactive pull backpressure
  • 1.x is a catch all for other items that may be pursued in 1.2, 1.3 and later versions.

Change Log

  • all deprecated methods and types from v0.20 and earlier are deleted
  • now published to groupId io.reactivex instead of com.netflix.rxjava
  • artifactId is now rxjava instead of rxjava-core
io.reactivex:rxjava:1.0.0

Following are specific changes from 0.20 to 1.0 to be aware of:

groupBy/groupByUntil

The groupByUntil operator was removed by collapsing its behavior into groupBy. Previously on groupBy when a child GroupedObservable was unsubscribed it would internally retain the state and ignore all future onNext for that key.

This matched behavior in Rx.Net but was found to be non-obvious and almost everyone using groupBy on long-lived streams actually wanted the behavior of groupByUntil where an unsubscribed GroupedObservable would clean up the resources and then if onNext for that key arrived again a new GroupedObservable would be emitted.

Adding backpressure (reactive pull) to groupByUntil was found to not work easily with its signatures so before 1.0 Final it was decided to collapse groupBy and groupByUntil. Further details on this can be found in Pull Request 1727.

Here is an example of how groupBy now behaves when a child GroupedObservable is unsubscribed (using take here):

// odd/even into lists of 10
Observable.range(1, 100)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.take(10).toList();
        }).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
[22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
[41, 43, 45, 47, 49, 51, 53, 55, 57, 59]
[42, 44, 46, 48, 50, 52, 54, 56, 58, 60]
[61, 63, 65, 67, 69, 71, 73, 75, 77, 79]
[62, 64, 66, 68, 70, 72, 74, 76, 78, 80]
[81, 83, 85, 87, 89, 91, 93, 95, 97, 99]
[82, 84, 86, 88, 90, 92, 94, 96, 98, 100]

Previously this would have only emitted 2 groups and ignored all subsequent values:

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

On a finite stream, similar behavior of the previous groupBy implementation that would filter can be achieved like this:

//odd/even into lists of 10
Observable.range(1, 100)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.filter(i -> i <= 20).toList();
        }).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

That however does allow the stream to complete (which may not be wanted).

To unsubscribe here are some choices that get the same output but efficiently unsubscribe up so the source only emits 40 values:

Observable.timer(0, 1, TimeUnit.MILLISECONDS)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.take(10).toList();
        }).take(2).toBlocking().forEach(System.out::println);

or

Observable.timer(0, 1, TimeUnit.MILLISECONDS)
        .take(20)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.toList();
        }).toBlocking().forEach(System.out::println);

These show that now groupBy composes like any other operator without the nuanced and hidden behavior of ignoring values after a child GroupedObservable is unsubscribed.

Uses of groupByUntil can now all be done by just using operators like take, takeWhile and takeUntil on the GroupedObservable directly, such as this:

Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c"))
        .groupBy(n -> n)
        .flatMap(g -> {
            return g.take(3).reduce((s, s2) -> s + s2);
        }).forEach(System.out::println);
aaa
bbb
ccc
aaa
bbb
ccc

retryWhen/repeatWhen

The retryWhen and repeatWhen method signatures both emitted a Observable<Notification> type which could be queried to represent either onError in the retryWhen case or onCompleted in the repeatWhen case. This was found to be confusing and unnecessary. The signatures were changed to emit Observable<Throwable> for retryWhen and Observable<Void> for repeatWhen to better signal the type of notification they are emitting without the need to then query the Notification.

The following contrived examples shows how the Observable<Throwable> is used to get the error that occurred when deciding to retry:

    AtomicInteger count = new AtomicInteger();
    Observable.create((Subscriber<? super String> s) -> {
        if (count.getAndIncrement() == 0) {
            s.onError(new RuntimeException("always fails"));
        } else {
            s.onError(new IllegalArgumentException("user error"));
        }
    }).retryWhen(attempts -> {
        return attempts.flatMap(throwable -> {
            if (throwable instanceof IllegalArgumentException) {
                System.out.println("don't retry on IllegalArgumentException... allow failure");
                return Observable.error(throwable);
            } else {
                System.out.println(throwable + " => retry after 1 second");
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        });
    })
    .toBlocking().forEach(System.out::println);

collect

The collect operator was changed to require a factory method for the initial value. This allows the Observable to be executed multiple times and get a new value (typically a mutable data structure) each time. Prior to this the Observable could only be subscribed to once since it would retain the original mutable data structure and keep mutating it.

Observable.range(0, 10).collect(() -> new ArrayList<Integer>(), (list, i) -> {
    list.add(i);
}).forEach(System.out::println);

Removed Scheduler.parallelism

The Scheduler.parallelism method was no longer being used by anything so was removed.

Removed Observable.parallel

The parallel operator was a failed experiment and almost all uses of it were wrong and led to confusion and often bad performance. Due to this it was removed.

Here is example code to show approaches to adding concurrency:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
        System.out.println("------------ mergingSync");
        mergingSync();
        System.out.println("------------ mergingSyncMadeAsync");
        mergingSyncMadeAsync();
        System.out.println("------------ flatMapExampleSync");
        flatMapExampleSync();
        System.out.println("------------ flatMapExampleAsync");
        flatMapExampleAsync();
        System.out.println("------------ flatMapBufferedExampleAsync");
        flatMapBufferedExampleAsync();
        System.out.println("------------ flatMapWindowedExampleAsync");
        flatMapWindowedExampleAsync();
        System.out.println("------------");
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2))
                .toBlocking().forEach(System.out::println);
    }

    /**
     * Merging async Observables subscribes to all of them concurrently.
     */
    private static void mergingSync() {
        // here you'll see the delay as each is executed synchronously
        Observable.merge(getDataSync(1), getDataSync(2))
                .toBlocking().forEach(System.out::println);
    }

    /**
     * If the Observables are synchronous they can be made async with `subscribeOn`
     */
    private static void mergingSyncMadeAsync() {
        // if you have something synchronous and want to make it async, you can schedule it like this
        // so here we see both executed concurrently
        Observable.merge(
                getDataSync(1).subscribeOn(Schedulers.io()),
                getDataSync(2).subscribeOn(Schedulers.io())
                )
                .toBlocking().forEach(System.out::println);
    }

    /**
     * flatMap uses `merge` so any async Observables it returns will execute concurrently.
     */
    private static void flatMapExampleAsync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataAsync(i);
        }).toBlocking().forEach(System.out::println);
    }

    /**
     * If synchronous Observables are merged (via flatMap here) then it will behave like `concat`
     * and execute each Observable (getDataSync here) synchronously one after the other.
     */
    private static void flatMapExampleSync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataSync(i);
        }).toBlocking().forEach(System.out::println);
    }

    /**
     * If a single stream needs to be split across multiple CPUs it is generally more efficient to do it in batches.
     * 
     * The `buffer` operator can be used to batch into chunks that are then each processed on a separate thread.
     */
    private static void flatMapBufferedExampleAsync() {
        Observable.range(0, 5000).buffer(500).flatMap(i -> {
            return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
                // simulate computational work
                    try {
                        Thread.sleep(1);
                    } catch (Exception e) {
                    }
                    return item + " processed " + Thread.currentThread();
                });
        }).toBlocking().forEach(System.out::println);
    }

    /**
     * Or the `window` operator can be used instead of buffer to process them as a stream instead of buffered list.
     */
    private static void flatMapWindowedExampleAsync() {
        Observable.range(0, 5000).window(500).flatMap(work -> {
            return work.observeOn(Schedulers.computation()).map(item -> {
                // simulate computational work
                    try {
                        Thread.sleep(1);
                    } catch (Exception e) {
                    }
                    return item + " processed " + Thread.currentThread();
                });
        }).toBlocking().forEach(System.out::println);
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                s.onNext(i);
                s.onCompleted();
            });
    }
}

Removed Observable.parallelMerge

Similar to the parallel operator parallelMerge was confusing and was a failed experiment so was removed.

Removed publishLast/initialValue

Removed publishLast since it can be done with takeLast(1).publish(). Removed any method overload that took an initial value since the startWith operator already allows that generically.

Removed Utility Methods/Classes

A handful of utility classes and methods that are not core to the purpose of RxJava were removed. This was preferred over having to constantly make arbitrary decisions about what utility methods should be included versus not while balancing the other goal of keeping the library small and focused.

The changes can be seen here and here

Observable.compose

The Transformer signature used by compose was changed from:

public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<? extends R>>

to

public static interface Transformer<T, R> extends Func1<Observable<T>, Observable<? extends R>>

This was done after finding issues with the generics that prevented it from easily being used.

Removed Observable.multicast

The multicast operator was removed in favor of directly using publish(), replay(), share() or cache(). This was done because multicast could not be made to work with reactive pull backpressure due to its use of Subject. The publish and replay operators on the other hand can have their implementations changed to not use a Subject and then support backpressure. The publish() operator has since been updated to support backpressure.

Removed Observable.*withIndex

The takeWhileWithIndex and skipWhileWithIndex operators were removed since zipWith(Observable.range(Integer.MAX_VALUE)) can be used on any Observable to get an index value. Any operator may want "withIndex" so it didn't make sense to have API clutter with some and certainly not all would have this variant added.

Observable.longCount -> countLong

The longCount operator was renamed to countLong so it shows up alphabetically next to count.

Full List of Changes

Version 1.0.0-rc.12 – November 17th 2014 (Maven Central)

Version 1.0.0-rc.11 – November 15th 2014 (Maven Central)

  • [Pull 1882] (ReactiveX#1882) Remove Unused Scheduler.parallelism
  • [Pull 1884] (ReactiveX#1884) Fix Scan/Reduce/Collect Factory Ambiguity
  • [Pull 1866] (ReactiveX#1866) Fix memory leak in bounded ReplaySubject due to retaining the node index

Version 0.20.7 – November 11th 2014 (Maven Central)

  • [Pull 1863] (ReactiveX#1863) Fix Concat Breaks with Double onCompleted

Version 1.0.0-rc.10 – November 8th 2014 (Maven Central)

  • [Pull 1834] (ReactiveX#1834) Subject.toSerialized
  • [Pull 1832] (ReactiveX#1832) Fix Take Early Unsubscription Causing Interrupts
  • [Pull 1835] (ReactiveX#1835) Scan/Reduce with Seed Factory
  • [Pull 1836] (ReactiveX#1836) Reduce Ring Buffer Default Sizes (and lower for Android)
  • [Pull 1833] (ReactiveX#1833) Fix Thread Safety for Unsubscribe of Window
  • [Pull 1827] (ReactiveX#1827) CacheThreadScheduler Evictor should Check Removal
  • [Pull 1830] (ReactiveX#1830) Fix mergeDelayError Handling of Error in Parent Observable
  • [Pull 1829] (ReactiveX#1829) Fix Window by Count Unsubscribe Behavior

Version 1.0.0-rc.9 – November 2nd 2014 (Maven Central)

  • [Pull 1788] (ReactiveX#1788) Remove PublishLast/InitialValue
  • [Pull 1796] (ReactiveX#1796) Improve TestSubject Javadoc
  • [Pull 1803] (ReactiveX#1803) Print full classname (inner class support) and fix enum output
  • [Pull 1802] (ReactiveX#1802) add hasObservers method to Subjects
  • [Pull 1806] (ReactiveX#1806) Remove Unnecessary Utilities
  • [Pull 1809] (ReactiveX#1809) Remove Utility Functions from Public API
  • [Pull 1813] (ReactiveX#1813) Fix issue #1812 that zip may swallow requests
  • [Pull 1817] (ReactiveX#1817) Fix Synchronous OnSubscribe Exception Skips Operators
  • [Pull 1819] (ReactiveX#1819) Fix Concat Breaks with Double onCompleted

Version 1.0.0-rc.8 – October 23rd 2014 (Maven Central)

Version 1.0.0-rc.7 – October 16th 2014 (Maven Central)

  • [Pull 1767] (ReactiveX#1767) ExecutorScheduler delivers uncaught exceptions
  • [Pull 1765] (ReactiveX#1765) backpressure support in onErrorResumeNext* operators
  • [Pull 1766] (ReactiveX#1766) Unhandled errors go to UncaughtExceptionHandler
  • [Pull 1755] (ReactiveX#1755) OnSubscribeRefCount with Synchronous Support
  • [Pull 1750] (ReactiveX#1750) Fix NPE when iterable is null

Version 0.20.6 – October 15th 2014 (Maven Central)

  • [Pull 1721] (ReactiveX#1721) Bug in the onBackpressure operators
  • [Pull 1695] (ReactiveX#1695) rewrite OnSubscribeRefCount to handle synchronous source
  • [Pull 1761] (ReactiveX#1761) Fix null-emitting combineLatest

Version 1.0.0-rc.6 – October 10th 2014 (Maven Central)

This release is primarily bug fixes along with API cleanup by renaming longCount to countLong to be alphabetically sorted alongside count.

Version 1.0.0-rc.5 – October 6th 2014 (Maven Central)

  • [Pull 1729] (ReactiveX#1729) CombineLatest: Request Up When Dropping Values
  • [Pull 1728] (ReactiveX#1728) ObserveOn Error Propagation
  • [Pull 1727] (ReactiveX#1727) Proposed groupBy/groupByUntil Changes
  • [Pull 1726] (ReactiveX#1726) Fix Merge: backpressure + scalarValueQueue don't play nicely
  • [Pull 1720] (ReactiveX#1720) Change repeatWhen and retryWhen signatures.
  • [Pull 1719] (ReactiveX#1719) Fix Bug in the onBackpressure operators

groupBy/groupByUntil

The groupByUntil operator was removed by collapsing its behavior into groupBy. Previously on groupBy when a child GroupedObservable was unsubscribed it would internally retain the state and ignore all future onNext for that key.

This matched behavior in Rx.Net but was found to be non-obvious and almost everyone using groupBy on long-lived streams actually wanted the behavior of groupByUntil where an unsubscribed GroupedObservable would clean up the resources and then if onNext for that key arrived again a new GroupedObservable would be emitted.

Adding backpressure (reactive pull) to groupByUntil was found to not work easily with its signatures so before 1.0 Final it was decided to collapse groupBy and groupByUntil. Further details on this can be found in Pull Request 1727.

Here is an example of how groupBy now behaves when a child GroupedObservable is unsubscribed (using take here):

// odd/even into lists of 10
Observable.range(1, 100)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.take(10).toList();
        }).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
[22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
[41, 43, 45, 47, 49, 51, 53, 55, 57, 59]
[42, 44, 46, 48, 50, 52, 54, 56, 58, 60]
[61, 63, 65, 67, 69, 71, 73, 75, 77, 79]
[62, 64, 66, 68, 70, 72, 74, 76, 78, 80]
[81, 83, 85, 87, 89, 91, 93, 95, 97, 99]
[82, 84, 86, 88, 90, 92, 94, 96, 98, 100]

Previously this would have only emitted 2 groups and ignored all subsequent values:

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

On a finite stream, similar behavior of the previous groupBy implementation that would filter can be achieved like this:

//odd/even into lists of 10
Observable.range(1, 100)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.filter(i -> i <= 20).toList();
        }).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

That however does allow the stream to complete (which may not be wanted).

To unsubscribe here are some choices that get the same output but efficiently unsubscribe up so the source only emits 40 values:

Observable.timer(0, 1, TimeUnit.MILLISECONDS)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.take(10).toList();
        }).take(2).toBlocking().forEach(System.out::println);

or

Observable.timer(0, 1, TimeUnit.MILLISECONDS)
        .take(20)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.toList();
        }).toBlocking().forEach(System.out::println);

These show that now groupBy composes like any other operator without the nuanced and hidden behavior of ignoring values after a child GroupedObservable is unsubscribed.

Uses of groupByUntil can now all be done by just using operators like take, takeWhile and takeUntil on the GroupedObservable directly, such as this:

Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c"))
        .groupBy(n -> n)
        .flatMap(g -> {
            return g.take(3).reduce((s, s2) -> s + s2);
        }).forEach(System.out::println);
aaa
bbb
ccc
aaa
bbb
ccc

retryWhen/repeatWhen

The retryWhen and repeatWhen method signatures both emitted a Observable<Notification> type which could be queried to represent either onError in the retryWhen case or onCompleted in the repeatWhen case. This was found to be confusing and unnecessary. The signatures were changed to emit Observable<Throwable> for retryWhen and Observable<Void> for repeatWhen to better signal the type of notification they are emitting without the need to then query the Notification.

The following contrived examples shows how the Observable<Throwable> is used to get the error that occurred when deciding to retry:

    AtomicInteger count = new AtomicInteger();
    Observable.create((Subscriber<? super String> s) -> {
        if (count.getAndIncrement() == 0) {
            s.onError(new RuntimeException("always fails"));
        } else {
            s.onError(new IllegalArgumentException("user error"));
        }
    }).retryWhen(attempts -> {
        return attempts.flatMap(throwable -> {
            if (throwable instanceof IllegalArgumentException) {
                System.out.println("don't retry on IllegalArgumentException... allow failure");
                return Observable.error(throwable);
            } else {
                System.out.println(throwable + " => retry after 1 second");
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        });
    })
    .toBlocking().forEach(System.out::println);

Version 1.0.0-rc.4 – October 2nd 2014 (Maven Central)

  • [Pull 1687] (ReactiveX#1687) Don't allocate an empty ArrayList for each Observable.empty call
  • [Pull 1705] (ReactiveX#1705) Fix null-emitting combineLatest
  • [Pull 1683] (ReactiveX#1683) ObserveOn Error Handling
  • [Pull 1686] (ReactiveX#1686) Fix Rx serialization bug in takeUntil again and the concurrent issue in BufferUntilSubscriber
  • [Pull 1701] (ReactiveX#1701) Fix the compose generics
  • [Pull 1712] (ReactiveX#1712) Fixing regression in mergeDelayError
  • [Pull 1716] (ReactiveX#1716) Remove Observable.Parallel

Version 0.20.5 – October 2nd 2014 (Maven Central)

  • [Pull 1686] (ReactiveX#1686) Fix Rx serialization bug in takeUntil again and the concurrent issue in BufferUntilSubscriber
  • [Pull 1701] (ReactiveX#1701) Fix the compose generics
  • [Pull 1712] (ReactiveX#1712) Fixing regression in mergeDelayError
  • [Pull 1715] (ReactiveX#1715) Deprecate Observable.Parallel

Version 1.0.0-rc.3 (Maven Central)

  • Merging fixes from 0.20.4 into 1.x branch

Version 0.20.4 (Maven Central)

  • [Pull 1667] (ReactiveX#1667) Fix the bug that Switch doesn't propagate 'unsubscribe'
  • [Pull 1659] (ReactiveX#1659) OperatorScan should check for MAX_VALUE on request
  • [Pull 1657] (ReactiveX#1657) Ignore furthur messages after entering terminate state
  • [Pull 1669] (ReactiveX#1669) Error Handling Unsubscribe and Terminal State
  • [Pull 1656] (ReactiveX#1656) Make TakeUntil obey Rx serialization contract
  • [Pull 1664] (ReactiveX#1664) StringObservable.split NPE fixes

Version 1.0.0-rc.2 (Maven Central)

Version 0.20.3 (Maven Central)

  • [Pull 1648] (ReactiveX#1648) Operator Scan Backpressure Fix
  • [Pull 1651] (ReactiveX#1651) RxScala: Fix the problem that Subscriber.onStart isn't called
  • [Pull 1641] (ReactiveX#1641) RxScala: Fix infinite recursive onStart call in Subscriber
  • [Pull 1646] (ReactiveX#1646) Deprecate ParallelMerge

Version 1.0.0-rc.1 (Maven Central)

The first release candidate for 1.0.0.

This is the same code as version 0.20.2 except:

  • all deprecated methods and types are deleted
  • now published to groupId io.reactivex instead of com.netflix.rxjava
  • artifactId is now rxjava instead of rxjava-core
io.reactivex:rxjava:1.0.0-rc.1
  • all sub-projects are separated into their own projects and no longer released along with RxJava

The artifacts can be found on maven Central at: http://repo1.maven.org/maven2/io/reactivex/rxjava/1.0.0-rc.1/

Version 0.20.2 (Maven Central)

  • [Pull 1637] (ReactiveX#1637) Optimize single BlockingObservable operations

Version 0.20.1 (Maven Central)

  • [Pull 1631] (ReactiveX#1631) Handle Fatal Exceptions in doOn* operators
  • [Pull 1625] (ReactiveX#1625) RxScala: Mark superfluous from/empty methods with scheduler parameter as deprecated
  • [Pull 1623] (ReactiveX#1623) RxScala: Add more operators to match RxJava
  • [Pull 1632] (ReactiveX#1632) Composite Exception - Circular Reference Handling

Version 0.20.0 (Maven Central)

RxJava 0.20.0 is a major release that adds "reactive pull" support for backpressure along with several other enhancements leading into the 1.0 release.

Reactive Pull for Backpressure

Solutions for backpressure was the major focus of this release. A "reactive pull" implementation was implemented. Documentation on this and other options for backpressure are found in the wiki: https://github.com/ReactiveX/RxJava/wiki/Backpressure

The reactive pull solution evolved out of several prototypes and interaction with many people over the months.

Signature Changes

A new type Producer has been added:

public interface Producer {
    public void request(long n);
}

The Subscriber type now has these methods added:

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    public void onStart();
    protected final void request(long n);
    public final void setProducer(Producer producer);
}
Examples

This trivial example shows requesting values one at a time:

Observable.from(1, 2, 3, 4).subscribe(new Subscriber<Integer>() {

    @Override
    public void onStart() {
        // on start this tells it to request 1
        // otherwise it defaults to request(Long.MAX_VALUE)
        request(1);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(Integer t) {
        System.out.println(t);
        // as each onNext is consumed, request another 
        // otherwise the Producer will not send more
        request(1);
    }

});

The OnSubscribeFromIterable operator shows how an Iterable is consumed with backpressure.

Some hi-lights (modified for simplicity rather than performance and completeness):

public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

    @Override
    public void call(final Subscriber<? super T> o) {
        final Iterator<? extends T> it = is.iterator();
		// instead of emitting directly to the Subscriber, it emits a Producer
        o.setProducer(new IterableProducer<T>(o, it));
    }
	
	private static final class IterableProducer<T> implements Producer {
	
        public void request(long n) {
            int _c = requested.getAndAdd(n);
            if (_c == 0) {
                while (it.hasNext()) {
                    if (o.isUnsubscribed()) {
                        return;
                    }
                    T t = it.next();
                    o.onNext(t);
                    if (requested.decrementAndGet() == 0) {
                        // we're done emitting the number requested so return
                        return;
                    }
                }

                o.onCompleted();
            }

        }
	}
}

The observeOn operator is a sterotypical example of queuing on one side of a thread and draining on the other, now with backpressure.

private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
        @Override
        public void onStart() {
            // signal that this is an async operator capable of receiving this many
            request(RxRingBuffer.SIZE);
        }
		
        @Override
        public void onNext(final T t) {
            try {
				// enqueue
                queue.onNext(t);
            } catch (MissingBackpressureException e) {
				// fail if the upstream has not obeyed our backpressure requests
                onError(e);
                return;
            }
			// attempt to schedule draining if needed
            schedule();
        }
		
		// the scheduling polling will then drain the queue and invoke `request(n)` to request more after draining
}

Many use cases will be able to use Observable.from, Observable.onBackpressureDrop and Observable.onBackpressureBuffer to achieve "reactive pull backpressure" without manually implementing Producer logic. Also, it is optional to make an Observable support backpressure. It can remain completely reactive and just push events as it always has. Most uses of RxJava this works just fine. If backpressure is needed then it can be migrated to use a Producer or several other approaches to flow control exist such as throttle, sample, debounce, window, buffer, onBackpressureBuffer, and onBackpressureDrop.

The wiki provides further documentation.

Relation to Reactive Streams

Contributors to RxJava are involved in defining the Reactive Streams spec. RxJava 1.0 is trying to comply with the semantic rules but is not attempting to comply with the type signatures. It will however have a separate module that acts as a bridge between the RxJava Observable and the Reactive Stream types.

The reasons for this are:

  • Rx has Observer.onCompleted whereas Reactive Streams has onComplete. This is a massive breaking change to remove a "d".
  • The RxJava Subscription is used also a "Closeable"/"Disposable" and it does not work well to make it now also be used for request(n), hence the separate type Producer in RxJava. It was attempted to reuse rx.Subscription but it couldn't be done without massive breaking changes.
  • Reactive Streams uses onSubscribe(Subscription s) whereas RxJava injects the Subscription as the Subscriber. Again, this change could not be done without major breaking changes.
  • RxJava 1.0 needs to be backwards compatible with the major Rx contracts established during the 0.x roadmap.

Considering these things, the major semantics of request(long n) for backpressure are compatible and this will allow interop with a bridge between the interfaces.

New Features

Compose/Transformer

The compose operator is similar to lift but allows custom operator implementations that are chaining Observable operators whereas lift is directly implementing the raw Subscriber logic.

Here is a trival example demonstrating how using compose is a better option than lift when existing Observable operators can be used to achieve the custom behavior.

import rx.Observable;
import rx.Observable.Operator;
import rx.Observable.Transformer;
import rx.Subscriber;

public class ComposeExample {

    public static void main(String[] args) {
        Observable.just("hello").compose(appendWorldTransformer()).forEach(System.out::println);
        Observable.just("hello").lift(appendWorldOperator()).forEach(System.out::println);
    }

    // if existing operators can be used, compose with Transformer is ideal
    private static Transformer<? super String, String> appendWorldTransformer() {
        return o -> o.map(s -> s + " world!").finallyDo(() -> {
            System.out.println("  some side-effect");
        });
    }

    // whereas lift is more low level
    private static Operator<? super String, String> appendWorldOperator() {
        return new Operator<String, String>() {

            @Override
            public Subscriber<? super String> call(Subscriber<? super String> child) {
                return new Subscriber<String>(child) {

                    @Override
                    public void onCompleted() {
                        child.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e) {
                        child.onError(e);
                    }

                    @Override
                    public void onNext(String t) {
                        child.onNext(t + " world!");
                        System.out.println("  some side-effect");
                    }

                };
            }

        };
    }
}
retryWhen/repeatWhen

New operators retryWhen and repeatWhen were added which offer support for more advanced recursion such as retry with exponential backoff.

Here is an example that increases delay between each retry:

Observable.create((Subscriber<? super String> s) -> {
    System.out.println("subscribing");
    s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
    return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
        System.out.println("delay retry by " + i + " second(s)");
        return Observable.timer(i, TimeUnit.SECONDS);
    });
}).toBlocking().forEach(System.out::println);

Breaking Changes

The use of Producer has been added in such a way that it is optional and additive, but some operators that used to have unbounded queues are now bounded. This means that if a source Observable emits faster than the Observer can consume them, a MissingBackpressureException can be emitted via onError.

This semantic change can break existing code.

There are two ways of resolving this:

  1. Modify the source Observable to use Producer and support backpressure.
  2. Use newly added operators such as onBackpressureBuffer or onBackpressureDrop to choose a strategy for the source Observable of how to behave when it emits more data than the consuming Observer is capable of handling. Use of onBackpressureBuffer effectively returns it to having an unbounded buffer and behaving like version 0.19 or earlier.

Example:

sourceObservable.onBackpressureBuffer().subscribe(slowConsumer);

Deprecations

Various methods, operators or classes have been deprecated and will be removed in 1.0. Primarily they have been done to remove ambiguity, remove nuanced functionality that is easy to use wrong, clear out superfluous methods and eliminate cruft that was added during the 0.x development process but has been replaced.

For example, Observable.from(T) was deprecated in favor of Observable.just(T) despite being a painful breaking change so as to solve ambiguity with Observable.from(Iterable).

This means that the upgrade from 0.20 to 1.0 will be breaking. This is being done so that the 1.x version can be a long-lived stable API built upon as clean a foundation as possible.

A stable API for RxJava is important because it is intended to be a foundational library that many projects will depend upon. The deprecations are intended to help this be achieved.

Future

The next release will be 1.0 (after a few release candidates). The RxJava project has been split up into many new top-level projects at https://github.com/ReactiveX so each of their release cycles and version strategies can be decoupled.

The 1.x version is intended to be stable for many years and target Java 6, 7 and 8. The expected outcome is for a 2.x version to target Java 8+ but for RxJava 1.x and 2.x to co-exist and both be living, supported versions.

Version 0.20.0-RC6 (Maven Central)

Further fixes and enhancements bringing us close to completing 0.20.0 and almost ready for 1.0.

A more major change in this release is the deprecation of Observable.from(T). The full discussion can be seen in #1563.

Version 0.20.0-RC5 (Maven Central)

Version 0.20.0-RC5 updates parallel, buffer(size), switchOnNext, repeat, and retry to support "reactive pull" backpressure. It adds a groupBy overload with an element selector, a new compose method as an alternative to lift for custom operators, fixes bugs and other general improvements.

There are still oustanding items being tracked for 0.20 that need to be completed for the final release.

Version 0.20.0-RC4 (Maven Central)

Version 0.20.0-RC4 continues bug fixes and completing work related to "reactive pull" backpressure. This release updates amb and concat to connect the backpressure request.

Internal uses of RxRingBuffer migrated to using SpmcArrayQueue which significantly reduces object allocations. See #1526 for details.

The multicast operators were updated to use a Subject factory so that Observable sequences can be reused. See #1515 for details.

Version 0.20.0-RC3 (Maven Central)

Version 0.20.0-RC3 preview release fixes several bugs related to backpressure and adds retryWhen, repeatWhen for more advanced recursion use cases like retry with exponential backoff.

This version passed the Netflix API production canary process. Please test this against your code to help us find any issues before we release 0.20.0.

Version 0.20.0-RC2 (Maven Central)

Version 0.20.0-RC2 preview release adds support for backpressure to the zip operators, fixes bugs and removes the Subscribe.onSetProducer method.

This means signature changes are modified to be:

The new type Producer ->

public interface Producer {
    public void request(long n);
}

New methods added to Subscriber ->

public abstract class Subscriber<T> implements Observer<T>, Subscription {
	public void onStart();
	protected final void request(long n);
	public final void setProducer(Producer producer);
}

Version 0.20.0-RC1 (Maven Central)

Version 0.20.0-RC1 is a preview release that adds backpressure support to RxJava as per issue #1000. It has been done in a way that is mostly additive and most existing code will not be affected by these additions. A section below on "Breaking Changes" will discuss use cases that do break and how to deal with them.

This release has been tested successfully in Netflix production canaries, but that does not exercise all use cases or operators, nor does it leverage the newly added backpressure functionality (though the backpressure code paths are used).

Outstanding Work

  • The zip operator has not yet been upgraded to support backpressure. The work is almost done and it will be included in the next release.
  • Not all operators have yet been reviewed for whether they need to be changed in any way.
  • Temporal operators (like buffer, window, sample, etc) need to be modified to disable backpressure upstream (using request(Long.MAX_VALUE)) and a decision made about how downstream backpressure requests will be supported.
  • Ensure all code works on Android. New data structures rely on sun.misc.Unsafe but are conditionally used only when it is available. We need to ensure those conditions are working and the alternative implementations are adequate. The default buffer size of 1024 also needs to be reviewed for whether it is a correct default for all systems, or needs to be modified by environment (such as smaller for Android).
  • Ensure use cases needing backpressure all work.

Signature Changes

A new type Producer has been added:

public interface Producer {
    public void request(long n);
}

The Subscriber type now has these methods added:

public abstract class Subscriber<T> implements Observer<T>, Subscription {
	public void onStart();
	public final void request(long n);
	public final void setProducer(Producer producer);
	protected Producer onSetProducer(Producer producer);
}

Examples

This trivial example shows requesting values one at a time:

Observable.from(1, 2, 3, 4).subscribe(new Subscriber<Integer>() {

    @Override
    public void onStart() {
        request(1);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(Integer t) {
        request(1);
    }

});

The OnSubscribeFromIterable operator shows how an Iterable is consumed with backpressure.

Some hi-lights (modified for simplicity rather than performance and completeness):

public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

    @Override
    public void call(final Subscriber<? super T> o) {
        final Iterator<? extends T> it = is.iterator();
		// instead of emitting directly to the Subscriber, it emits a Producer
        o.setProducer(new IterableProducer<T>(o, it));
    }
	
	private static final class IterableProducer<T> implements Producer {
	
        public void request(long n) {
            int _c = requested.getAndAdd(n);
            if (_c == 0) {
                while (it.hasNext()) {
                    if (o.isUnsubscribed()) {
                        return;
                    }
                    T t = it.next();
                    o.onNext(t);
                    if (requested.decrementAndGet() == 0) {
                        // we're done emitting the number requested so return
                        return;
                    }
                }

                o.onCompleted();
            }

        }
	}
}

The observeOn operator is a sterotypical example of queuing on one side of a thread and draining on the other, now with backpressure.

private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
        @Override
        public void onStart() {
            // signal that this is an async operator capable of receiving this many
            request(RxRingBuffer.SIZE);
        }
		
        @Override
        public void onNext(final T t) {
            try {
				// enqueue
                queue.onNext(t);
            } catch (MissingBackpressureException e) {
				// fail if the upstream has not obeyed our backpressure requests
                onError(e);
                return;
            }
			// attempt to schedule draining if needed
            schedule();
        }
		
		// the scheduling polling will then drain the queue and invoke `request(n)` to request more after draining
}

Breaking Changes

The use of Producer has been added in such a way that it is optional and additive, but some operators that used to have unbounded queues are now bounded. This means that if a source Observable emits faster than the Observer can consume them, a MissingBackpressureException can be emitted via onError.

This semantic change can break existing code.

There are two ways of resolving this:

  1. Modify the source Observable to use Producer and support backpressure.
  2. Use newly added operators such as onBackpressureBuffer or onBackpressureDrop to choose a strategy for the source Observable of how to behave when it emits more data than the consuming Observer is capable of handling. Use of onBackpressureBuffer effectively returns it to having an unbounded buffer and behaving like version 0.19 or earlier.

Example:

sourceObservable.onBackpressureBuffer().subscribe(slowConsumer);

Relation to Reactive Streams

Contributors to RxJava are involved in defining the Reactive Streams spec. RxJava 1.0 is trying to comply with the semantic rules but is not attempting to comply with the type signatures. It will however have a separate module that acts as a bridge between the RxJava Observable and the Reactive Stream types.

The reasons for this are:

  • Rx has Observer.onCompleted whereas Reactive Streams has onComplete. This is a massive breaking change to remove a "d".
  • The RxJava Subscription is used also a "Closeable"/"Disposable" and it does not work well to make it now also be used for request(n), hence the separate type Producer in RxJava. It was attempted to reuse rx.Subscription but it couldn't be done without massive breaking changes.
  • Reactive Streams uses onSubscribe(Subscription s) whereas RxJava injects the Subscription as the Subscriber. Again, this change could not be done without major breaking changes.
  • RxJava 1.0 needs to be backwards compatible with the major Rx contracts established during the 0.x roadmap.
  • Reactive Streams is not yet 1.0 and despite significant progress, it is a moving target.

Considering these things, the major semantics of request(long n) for backpressure are compatible and this will allow interop with a bridge between the interfaces. As the Reactive Streams spec matures, RxJava 2.0 may choose to fully adopt the types in the future while RxJava 1.x retains the current signatures.

How to Help

First, please test this release against your existing code to help us determine if we have broken anything.

Second, try to solve backpressure use cases and provide feedback on what works and what doesn't work.

Thank you!

Version 0.19.6 (Maven Central)

Inclusion of 'rxjava-contrib:rxjava-scalaz' in release.

Version 0.19.5

Upload to Maven Central was corrupted so release is skipped.

Version 0.19.4 (Maven Central)

Version 0.19.3

Upload to Maven Central was corrupted so release is skipped.

Version 0.19.2 (Maven Central)

Version 0.19.1 (Maven Central)

  • [Pull 1357] (ReactiveX#1357) MergeWith, ConcatWith, AmbWith
  • [Pull 1345] (ReactiveX#1345) RxScala: Simplify doOnCompleted/Terminate, finallyDo callback usage
  • [Pull 1337] (ReactiveX#1337) Make Future receive NoSuchElementException when the BlockingObservable is empty
  • [Pull 1335] (ReactiveX#1335) RxAndroid: Bump build tools to 19.1 and android plugin to 0.11
  • [Pull 1327] (ReactiveX#1327) Join patterns extension for 4..9 and N arity joins.
  • [Pull 1321] (ReactiveX#1321) RxAndroid: Ensuring Runnables posted with delay to a Handler are removed when unsubcribed
  • [Pull 1347] (ReactiveX#1347) Allow use of the returned subscription to cancel periodic scheduling
  • [Pull 1355] (ReactiveX#1355) Don't add the subscriber to the manager if it unsubscribed during the onStart call
  • [Pull 1350] (ReactiveX#1350) Baseline Performance Tests
  • [Pull 1316] (ReactiveX#1316) RxScala: Add the rest operators
  • [Pull 1324] (ReactiveX#1324) TrampolineScheduler & Unsubscribe
  • [Pull 1311] (ReactiveX#1311) Tiny integration test change

Version 0.19.0 (Maven Central)

Performance and Object Allocation

Fairly significant object allocation improvements are included in this release which reduce GC pressure and improve performance.

Two pull requests (amongst several) with details are:

With the following simple test code relative performance has increased as shown below:

Observable<Integer> o = Observable.just(1);
o.map(i -> {
    return String.valueOf(i);
}).map(i -> {
    return Integer.parseInt(i);
}).subscribe(observer);
Rx 0.19
Run: 10 - 10,692,099 ops/sec 
Run: 11 - 10,617,627 ops/sec 
Run: 12 - 10,938,405 ops/sec 
Run: 13 - 10,917,388 ops/sec 
Run: 14 - 10,783,298 ops/sec 
Rx 0.18.4
Run: 11 - 8,493,506 ops/sec 
Run: 12 - 8,403,361 ops/sec 
Run: 13 - 8,400,537 ops/sec 
Run: 14 - 8,163,998 ops/sec 
Rx 0.17.6
Run: 10 - 4,930,966 ops/sec 
Run: 11 - 6,119,951 ops/sec 
Run: 12 - 7,062,146 ops/sec 
Run: 13 - 6,514,657 ops/sec 
Run: 14 - 6,369,426 ops/sec 
Rx 0.16.1
Run: 10 - 2,879,355 ops/sec 
Run: 11 - 3,236,245 ops/sec 
Run: 12 - 4,468,275 ops/sec 
Run: 13 - 3,237,293 ops/sec 
Run: 14 - 4,683,840 ops/sec 

Note that these numbers are relative as they depend on the JVM and hardware.

Scala Changes

Many missing operators have been added to the RxScala APIs along with fixes and other maturation.

toBlockingObservable() -> toBlocking()

The toBlockingObservable() method has been deprecated in favor of toBlocking() for brevity and fit better with possible future additions such as toParallel() without always needing the Observable suffix.

forEach

forEach as added as an alias for subscribe to match the Java 8 naming convention.

This means code can now be written as:

Observable.from(1, 2, 3).limit(2).forEach(System.out::println);

which is an alias of this:

Observable.from(1, 2, 3).take(2).subscribe(System.out::println);

Since forEach exists on BlockingObservable as well, moving from non-blocking to blocking looks like this:

// non-blocking
Observable.from(1, 2, 3).limit(2).forEach(System.out::println);
// blocking
Observable.from(1, 2, 3).limit(2).toBlocking().forEach(System.out::println);

Schedulers

Thread caching is restored to Schedulers.io() after being lost in v0.18.

A replacement for ExecutorScheduler (removed in 0.18) is accessible via Schedulers.from(Executor e) that wraps an Executor and complies with the Rx contract.

ReplaySubject

All "replay" functionality now exists directly on the ReplaySubject rather than in an internal type. This means there are now several different create methods with the various overloads of size and time.

Changelist

  • Pull 1165 RxScala: Add dropUntil, contains, repeat, doOnTerminate, startWith, publish variants
  • Pull 1183 NotificationLite.accept performance improvements
  • Pull 1177 GroupByUntil to use BufferUntilSubscriber
  • Pull 1182 Add facilities for creating Observables from JavaFX events and ObservableValues
  • Pull 1188 RxScala Schedulers changes
  • Pull 1175 Fixed synchronous ConnectableObservable.connect problem
  • Pull 1172 ObserveOn: Change to batch dequeue
  • Pull 1191 Fix attempt for OperatorPivotTest
  • Pull 1195 SwingScheduler: allow negative schedule
  • Pull 1178 Fix RxScala bug
  • Pull 1210 Add more operators to RxScala
  • Pull 1216 RxScala: Exposing PublishSubject
  • Pull 1208 OperatorToObservableList: use LinkedList to buffer the sequence’s items
  • Pull 1185 Behavior subject time gap fix 2
  • Pull 1226 Fix bug in zipWithIndex and set zip(that, selector) public in RxScala
  • Pull 1224 Implement shorter toBlocking as shorter alias for toBlockingObservable.
  • Pull 1223 ReplaySubject enhancement with time and/or size bounds
  • Pull 1160 Add replay and multicast variants to RxScala
  • Pull 1229 Remove Ambiguous Subscribe Overloads with Scheduler
  • Pull 1232 Adopt Limit and ForEach Java 8 Naming Conventions
  • Pull 1233 Deprecate toBlockingObservable in favor of toBlocking
  • Pull 1237 SafeSubscriber memory reduction
  • Pull 1236 CompositeSubscription with atomic field updater
  • Pull 1243 Remove Subscription Wrapper from Observable.subscribe
  • Pull 1244 Observable.from(T) using Observable.just(T)
  • Pull 1239 RxScala: Update docs for "apply" and add an example
  • Pull 1248 Fixed testConcurrentOnNextFailsValidation
  • Pull 1246 Moved to atomic field updaters.
  • Pull 1254 ZipIterable unsubscription fix
  • Pull 1247 Add zip(iterable, selector) to RxScala
  • Pull 1260 Fix the bug that BlockingObservable.singleOrDefault doesn't call unsubscribe
  • Pull 1269 Fix the bug that int overflow can bypass the range check
  • Pull 1272 ExecutorScheduler to wrap an Executor
  • Pull 1264 ObserveOn scheduled unsubscription
  • Pull 1271 Operator Retry with predicate
  • Pull 1265 Add more operators to RxScala
  • Pull 1281 Reduce Subscription Object Allocation
  • Pull 1284 Lock-free, MPSC-queue
  • Pull 1288 Ensure StringObservable.from() does not perform unnecessary read
  • Pull 1286 Rename some Operator* classes to OnSubscribe*
  • Pull 1276 CachedThreadScheduler
  • Pull 1287 ReplaySubject remove replayState CHM and related SubjectObserver changes
  • Pull 1289 Schedulers.from(Executor)
  • Pull 1290 Upgrade to JMH 0.7.3
  • Pull 1293 Fix and Update JMH Perf Tests
  • Pull 1291 Check unsubscribe within observable from future
  • Pull 1294 rx.operators -> rx.internal.operators
  • Pull 1295 Change void accept to boolean accept
  • Pull 1296 Move re-used internal Scheduler classes to their own package
  • Pull 1298 Remove Bad Perf Test
  • Pull 1301 RxScala: Add convenience method for adding unsubscription callback
  • Pull 1304 Add flatMap and concatMap to RxScala
  • Pull 1306 Hooked RxJavaPlugins errorHandler up within all operators that swallow onErrors
  • Pull 1309 Hide ChainedSubscription/SubscriptionList from Public API

Version 0.18.4 (Maven Central)

This is a fix for CompositeSubscription object allocation problems. Details can be found in issue #1204.

  • Pull 1283 Subscription object allocation fix

Version 0.18.3 (Maven Central)

Version 0.18.2 (Maven Central)

Version 0.18.1 (Maven Central)

Version 0.18.0 (Maven Central)

This release takes us a step closer to 1.0 by completing some of the remaining work on the roadmap.

Scheduler

The first is simplifying the Scheduler API.

The Scheduler API is now simplified to this:

class Scheduler {
    public abstract Worker createWorker(); 
    public int parallelism();
    public long now();

    public abstract static class Worker implements Subscription {
        public abstract Subscription schedule(Action0 action, long delayTime, TimeUnit unit);
        public abstract Subscription schedule(Action0 action);
        public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit);
        public long now();
    }
}

This is a breaking change if you have a custom Scheduler implementation or use a Scheduler directly. If you only ever pass in a Scheduler via the Schedulers factory methods, this change does not affect you.

Additionally, the ExecutionScheduler was removed because a general threadpool does not meet the requirements of sequential execution for an Observable. It was replaced with rx.schedulers.EventLoopScheduler which is the new default for Schedulers.computation(). It is a pool of event loops.

rx.joins

The rx.joins package and associated when, and and then operators were moved out of rxjava-core into a new module rxjava-joins. This is done as the rx.joins API was not yet matured and is not going to happen before 1.0. It was determined low priority and not worth blocking a 1.0 release. If the API matures inside the separate module to the point where it makes sense to bring it back into the core it can be done in the 1.x series.

Deprecation Cleanup

This releases removes many of the classes and methods that have been deprecated in previous releases. Most of the removed functionality was migrated in previous releases to contrib modules such as rxjava-math, rxjava-async and rxjava-computation-expressions.

A handful of deprecated items still remain but can not yet be removed until all internal operators are finished migrating to using the lift/Subscriber design changes done in 0.17.0.

The full list of changes in 0.18.0:

Version 0.17.6 (Maven Central)

  • [Pull 1031] (ReactiveX#1031) Fix NPE in SubjectSubscriptionManager
  • [Pull 1030] (ReactiveX#1030) Benchmarking: Add JMH benchmark for ReplaySubject
  • [Pull 1033] (ReactiveX#1033) isolate subscriber used for retries, cleanup tests
  • [Pull 1021] (ReactiveX#1021) OperatorWeakBinding to not use WeakReferences anymore
  • [Pull 1005] (ReactiveX#1005) add toMap from Java Observable
  • [Pull 1040] (ReactiveX#1040) Fixed deadlock in Subjects + OperatorCache
  • [Pull 1042] (ReactiveX#1042) Kotlin M7 and full compatibility with 0.17.0
  • [Pull 1035] (ReactiveX#1035) Scala cleanup
  • [Pull 1009] (ReactiveX#1009) Android - Adding a new RetainedFragment example
  • [Pull 1020] (ReactiveX#1020) Upgrade Gradle wrapper for Android samples to Gradle 1.11
  • [Pull 1038] (ReactiveX#1038) rxjava-android: parameterize OperatorViewClick by concrete view type

Version 0.17.5 (Maven Central)

  • [Pull 1010] (ReactiveX#1010) Observable.unsafeSubscribe
  • [Pull 1015] (ReactiveX#1015) Remove Redundant protectivelyWrap Method
  • [Pull 1019] (ReactiveX#1019) Fix: retry() never unsubscribes from source until operator completes

Version 0.17.4 (Maven Central)

  • [Pull 990] (ReactiveX#990) Quasar Lightweight Threads/Fibers Contrib Module
  • [Pull 1012] (ReactiveX#1012) SerializedObserver: Removed window between the two synchronized blocks

Version 0.17.3 (Maven Central)

Version 0.17.2 (Maven Central)

  • Pull 963 A more robust JMH benchmarking set-up
  • Pull 964 SubjectSubscriptionManager fix.
  • Pull 970 Notifications for the allocation averse.
  • Pull 973 Merge - Handle Bad Observables
  • Pull 974 TestSubject, TestObserver and TestScheduler Improvements
  • Pull 975 GroupBy & Time Gap Fixes
  • Pull 976 parallel-merge unit test assertions
  • Pull 977 Dematerialize - handle non-materialized terminal events
  • Pull 982 Pivot Operator
  • Pull 984 Tests and Javadoc for Pivot
  • Pull 966 Reimplement the ElementAt operator and add it to rxjava-scala
  • Pull 965 BugFix: Chain Subscription in TimeoutSubscriber and SerializedSubscriber
  • Pull 986 Fix SynchronizedObserver.runConcurrencyTest
  • Pull 987 Fix Non-Deterministic Pivot Test
  • Pull 988 OnErrorFailedException

Version 0.17.1 (Maven Central)

  • Pull 953 Make ObserveOnTest.testNonBlockingOuterWhileBlockingOnNext deterministic
  • Pull 930 Initial commit of the Android samples module
  • Pull 938 OperatorWeakBinding (deprecates OperatorObserveFromAndroidComponent)
  • Pull 952 rxjava-scala improvements and reimplemented the amb operator
  • Pull 955 Fixed ReplaySubject leak
  • Pull 956 Fixed byLine test to use line.separator system property instead of \n.
  • Pull 958 OperatorSkipWhile
  • Pull 959 OperationToFuture must throw CancellationException on get() if cancelled
  • Pull 928 Fix deadlock in SubscribeOnBounded
  • Pull 960 Unit test for "Cannot subscribe to a Retry observable once all subscribers unsubscribed"
  • Pull 962 Migrate from SynchronizedObserver to SerializedObserver

Version 0.17.0 (Maven Central)

Version 0.17.0 contains some significant signature changes that allow us to significantly improve handling of synchronous Observables and simplify Schedulers. Many of the changes have backwards compatible deprecated methods to ease the migration while some are breaking.

The new signatures related to Observable in this release are:

// A new create method takes `OnSubscribe` instead of `OnSubscribeFunc`
public final static <T> Observable<T> create(OnSubscribe<T> f)

// The new OnSubscribe type accepts a Subscriber instead of Observer and does not return a Subscription
public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>

// Subscriber is an Observer + Subscription
public abstract class Subscriber<T> implements Observer<T>, Subscription

// The main `subscribe` behavior receives a Subscriber instead of Observer
public final Subscription subscribe(Subscriber<? super T> subscriber)

// Subscribing with an Observer however is still appropriate
// and the Observer is automatically converted into a Subscriber
public final Subscription subscribe(Observer<? super T> observer)

// A new 'lift' function allows composing Operator implementations together
public <R> Observable<R> lift(final Operator<? extends R, ? super T> lift)
	
// The `Operator` used with `lift`
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>

Also changed is the Scheduler interface which is much simpler:

public abstract class Scheduler {
	public Subscription schedule(Action1<Scheduler.Inner> action);
    public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
	public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
	public final Subscription scheduleRecursive(final Action1<Recurse> action)
	public long now();
	public int degreeOfParallelism();
	
	public static class Inner implements Subscription {
		public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
		public abstract void schedule(Action1<Scheduler.Inner> action);
		public long now();
	}
	
	public static final class Recurse {
		public final void schedule();
		public final void schedule(long delay, TimeUnit unit);
	}
}

This release applies many lessons learned over the past year and seeks to streamline the API before we hit 1.0.

As shown in the code above the changes fall into 2 major sections:

1) Lift/Operator/OnSubscribe/Subscriber

Changes that allow unsubscribing from synchronous Observables without needing to add concurrency.

2) Schedulers

Simplification of the Scheduler interface and make clearer the concept of "outer" and "inner" Schedulers for recursion.

Lift/Operator/OnSubscribe/Subscriber

New types Subscriber and OnSubscribe along with the new lift function have been added. The reasons and benefits are as follows:

1) Synchronous Unsubscribe

RxJava versions up until 0.16.x are unable to unsubscribe from a synchronous Observable such as this:

Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {

    @Override
    public void call(Observer<? super Integer> Observer) {
        for (int i = 1; i < 1000000; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
});

Subscribing to this Observable will always emit all 1,000,000 values even if unsubscribed such as via oi.take(10).

Version 0.17.0 fixes this issue by injecting the Subscription into the OnSubscribe function to allow code like this:

Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        // we now receive a Subscriber instead of Observer
        for (int i = 1; i < 1000000; i++) {
            // the OnSubscribe can now check for isUnsubscribed
            if (subscriber.isUnsubscribed()) {
                return;
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }

});

Subscribing to this will now correctly only emit 10 onNext and unsubscribe:

// subscribe with an Observer
oi.take(10).subscribe(new Observer<Integer>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Integer t) {
        println("Received: " + t);
    }

})

Or the new Subscriber type can be used and the Subscriber itself can unsubscribe:

// or subscribe with a Subscriber which supports unsubscribe
oi.subscribe(new Subscriber<Integer>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Integer t) {
        println("Received: " + t);
        if(t >= 10) {
            // a Subscriber can unsubscribe
            this.unsubscribe();
        }
    }

})
2) Custom Operator Chaining

Because Java doesn't support extension methods, the only approach to applying custom operators without getting them added to rx.Observable is using static methods. This has meant code like this:

MyCustomerOperators.operate(observable.map(...).filter(...).take(5)).map(...).subscribe()

In reality we want:

observable.map(...).filter(...).take(5).myCustomOperator().map(...).subscribe()

Using the newly added lift we can get quite close to this:

observable.map(...).filter(...).take(5).lift(MyCustomOperator.operate()).map(...).subscribe()

Here is how the proposed lift method looks if all operators were applied with it:

Observable<String> os = OBSERVABLE_OF_INTEGERS.lift(TAKE_5).lift(MAP_INTEGER_TO_STRING);

Along with the lift function comes a new Operator signature:

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>

All operator implementations in the rx.operators package will over time be migrated to this new signature.

NOTE: Operators that have not yet been migrated do not work with synchronous unsubscribe.

3) Simpler Operator Implementations

The lift operator injects the necessary Observer and Subscription instances (via the new Subscriber type) and eliminates (for most use cases) the need for manual subscription management. Because the Subscription is available in-scope there are no awkward coding patterns needed for creating a Subscription, closing over it and returning and taking into account synchronous vs asynchronous.

For example, the body of fromIterable is simply:

public void call(Subscriber<? super T> o) {
    for (T i : is) {
        if (o.isUnsubscribed()) {
            return;
        }
        o.onNext(i);
    }
    o.onCompleted();
}

The take operator is:

public Subscriber<? super T> call(final Subscriber<? super T> child) {
        final CompositeSubscription parent = new CompositeSubscription();
        if (limit == 0) {
            child.onCompleted();
            parent.unsubscribe();
        }

        child.add(parent);
        return new Subscriber<T>(parent) {

            int count = 0;
            boolean completed = false;

            @Override
            public void onCompleted() {
                if (!completed) {
                    child.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (!completed) {
                    child.onError(e);
                }
            }

            @Override
            public void onNext(T i) {
                if (!isUnsubscribed()) {
                    child.onNext(i);
                    if (++count >= limit) {
                        completed = true;
                        child.onCompleted();
                        unsubscribe();
                    }
                }
            }

        };
    }
4) Recursion/Loop Performance with Unsubscribe

The fromIterable use case is 20x faster when implemented as a loop instead of recursive scheduler (see https://github.com/ReactiveX/RxJava/commit/a18b8c1a572b7b9509b7a7fe1a5075ce93657771).

Several places we can remove recursive scheduling used originally for unsubscribe support and use a loop instead.

Schedulers

Schedulers were greatly simplified to a design based around Action1<Inner>.

public abstract class Scheduler {
	public Subscription schedule(Action1<Scheduler.Inner> action);
    public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
	public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
	public final Subscription scheduleRecursive(final Action1<Recurse> action)
	public long now();
	public int degreeOfParallelism();
	
	public static class Inner implements Subscription {
		public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
		public abstract void schedule(Action1<Scheduler.Inner> action);
		public long now();
	}
	
	public static final class Recurse {
		public final void schedule();
		public final void schedule(long delay, TimeUnit unit);
	}
}

This design change originated from three findings:

  1. It was very easy to cause memory leaks or inadvertent parallel execution since the distinction between outer and inner scheduling was not obvious.

To solve this the new design explicitly has the outer Scheduler and then Scheduler.Inner for recursion.

  1. The passing of state is not useful since scheduling over network boundaries with this model does not work.

In this new design all state passing signatures have been removed. This was determined while implementing a RemoteScheduler that attempted to use observeOn to transition execution from one machine to another. This does not work because of the requirement for serializing/deserializing the state of the entire execution stack. Migration of work over the network has been bound to be better suited to explicit boundaries established by Subjects. Thus, the complications within the Schedulers are unnecessary.

  1. The number of overloads with different ways of doing the same things were confusing.

This new design removes all but the essential and simplest methods.

  1. A scheduled task could not do work in a loop and easily be unsubscribed which generally meant less efficient recursive scheduling.

This new design applies similar principles as done with lift/create/OnSubscribe/Subscriber and injects the Subscription via the Inner interface so a running task can check isUnsubscribed().

WIth this new design, the simplest execution of a single task is:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        doWork();
    }

});

Recursion is easily invoked like this:

Schedulers.newThread().scheduleRecursive(new Action1<Recurse>() {

    @Override
    public void call(Recurse recurse) {
        doWork();
        // recurse until unsubscribed (the schedule will do nothing if unsubscribed)
        recurse.schedule();
    }


});

or like this if the outer and inner actions need different behavior:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        doWork();
        // recurse until unsubscribed (the schedule will do nothing if unsubscribed)
        inner.schedule(this);
    }

});

The use of Action1<Inner> on both the outer and inner levels makes it so recursion that refer to this and it works easily.

Similar to the new lift/create pattern with Subscriber the Inner is also a Subscription so it allows efficient loops with unsubscribe support:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        while(!inner.isUnsubscribed()) {
            doWork();
        }
    }

});

An action can now unsubscribe the Scheduler.Inner:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        while(!inner.isUnsubscribed()) {
            int i = doOtherWork();
            if(i > 100) {
                // an Action can cause the Scheduler to unsubscribe and stop
                inner.unsubscribe();
            }
        }
    }

});

Typically just stopping is sufficient:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        int i = doOtherWork();
        if (i < 10) {
            // recurse until done 10
            inner.schedule(this);
        }
    }

});

but if other work in other tasks is being done and you want to unsubscribe conditionally you could:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        int i = doOtherWork();
        if (i < 10) {
            // recurse until done 10
            inner.schedule(this);
        } else {
            inner.unsubscribe();
        }
    }

});

and the recursion can be delayed:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        doWork();
        // recurse until unsubscribed ... but delay the recursion
        inner.schedule(this, 500, TimeUnit.MILLISECONDS);
    }

});

The same pattern works with the Recurse signature:

Schedulers.newThread().scheduleRecursive(new Action1<Recurse>() {

    @Override
    public void call(Recurse recurse) {
        doWork();
        // recurse until unsubscribed (the schedule will do nothing if unsubscribed)
        recurse.schedule(500, TimeUnit.MILLISECONDS);
    }


});

The methods on the Inner never return a Subscription because they are always a single thread/event-loop/actor/etc and controlled by the Subscription returned by the initial Scheduler.schedule method. This is part of clarifying the contract.

Thus an unsubscribe controlled from the outside would be done like this:

Subscription s = Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        while(!inner.isUnsubscribed()) {
            doWork();
        }
    }

});

// unsubscribe from outside
s.unsubscribe();

Migration Path

1) Lift/OnSubscribe/Subscriber

The lift function will not be used by most and is additive so will not affect backwards compatibility. The Subscriber type is also additive and for most use cases does not need to be used directly, the Observer interface can continue being used.

The previous create(OnSubscribeFunc f) signature has been deprecated so code will work but now have warnings. Please begin migrating code as this will be deleted prior to the 1.0 release.

Code such as this:

Observable.create(new OnSubscribeFunc<Integer>() {

    @Override
    public Subscription onSubscribe(Observer<? super Integer> o) {
        o.onNext(1);
        o.onCompleted();
        return Subscriptions.empty();
    }
});

should change to this:

Observable.create(new OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
});

If concurrency was being injected to allow unsubscribe support:

Observable.create(new OnSubscribeFunc<Integer>() {

    @Override
    public Subscription onSubscribe(final Observer<? super Integer> o) {
        final BooleanSubscription s = new BooleanSubscription();
        Thread t = new Thread(new Runnable() {

            @Override
            public void run() {
                int i = 0;
                while (!s.isUnsubscribed()) {
                    o.onNext(i++);
                }
            }

        });
        t.start();
        return s;
    }
});

you may no longer need it and can implement like this instead:

Observable.create(new OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        int i = 0;
        while (!subscriber.isUnsubscribed()) {
            subscriber.onNext(i++);
        }
    }
});

or if the concurreny is still desired you can simplify the Subscription management:

Observable.create(new OnSubscribe<Integer>() {

    @Override
    public void call(final Subscriber<? super Integer> subscriber) {
        Thread t = new Thread(new Runnable() {

            @Override
            public void run() {
                int i = 0;
                while (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(i++);
                }
            }

        });
        t.start();
    }
});

or use subscribeOn which now works to make synchronous Observables async while supporting unsubscribe (this didn't work before):

Observable.create(new OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        int i = 0;
        while (!subscriber.isUnsubscribed()) {
            subscriber.onNext(i++);
        }
    }
}).subscribeOn(Schedulers.newThread());
2) Schedulers

Custom Scheduler implementations will need to be re-implemented and any direct use of the Scheduler interface will also need to be updated.

3) Subscription

If you have custom Subscription implementations you will see they now need an isUnsubscribed() method.

You can either add this method, or wrap your function using Subscriptions.create and it will handle the isUnsubscribed behavior and execute your function when unsubscribe() is called.

It is recommended to use Subscriptions.create for most Subscription usage.

The Future...

We have most if not all operators from Rx.Net that we want or intend to port. We think we have got the create/subscribe signatures as we want and the Subscription and Scheduler interfaces are now clean. There is at least one more major topic related to back pressure that may result in signature change in a future release. Beyond that no further major signature changing work is expected prior to 1.0.

We still need to improve on some of the Subject implementations still, particularly ReplaySubject. We are beginning to focus after this release on cleaning up all of the operator implementations, stabilizing, fixing bugs and performance tuning.

As we get closer to 1.0 there will be a release that focused on deleting all deprecated methods so it is suggested to start migrating off of them.

We appreciate your usage, feedback and contributions and hope the library is creating value for you!

Pull Requests

  • Pull 767 Zip fix for multiple onCompleted and moved unsubscribe outside the lock.
  • Pull 770 Bind Operator
  • Pull 778 Fix zip race condition
  • Pull 784 Lift and Observer+Subscription
  • Pull 793 Observer + Subscriber
  • Pull 796 Add Subscription.isUnsubscribed()
  • Pull 797 Scheduler Outer/Inner [Preview]
  • Pull 805 Fix CompositeException
  • Pull 785 Reimplement Zip Operator Using Lift [Preview]
  • Pull 814 RunAsync method for outputting multiple values
  • Pull 812 Fixed OperationSubscribeOn so OperationConditionalsTest works again.
  • Pull 816 One global onCompleted object
  • Pull 818 CompositeSubscription memory reduction
  • Pull 817 Scala Scheduler Bindings Fix
  • Pull 819 CompositeSubscription performance increase
  • Pull 781 Fixed buglet in join binding, simplified types
  • Pull 783 Implement some Android UI related operators
  • Pull 821 Update to use Subscriber/Subscriptions.create
  • Pull 826 Return wrapped Subscription
  • Pull 824 Set setDaemon on NewThreadScheduler
  • Pull 828 Repeat Operator
  • Pull 827 Fixed cut & paster error in io scheduler
  • Pull 833 Take operator was breaking the unsubscribe chain
  • Pull 822 Reimplement 'subscribeOn' using 'lift'
  • Pull 832 Issue #831 Fix for OperationJoin race condition
  • Pull 834 Update clojure for 0.17
  • Pull 839 Error Handling: OnErrorNotImplemented and java.lang.Error
  • Pull 838 Make Scala OnCompleted Notification an object
  • Pull 837 Perf with JMH
  • Pull 841 Range OnSubscribe
  • Pull 842 Test Unsubscribe
  • Pull 845 Fix problem with Subscription
  • Pull 847 Various Changes While Fixing GroupBy
  • Pull 849 Add 'Fragment-Host' to rxjava-contrib modules for OSGi
  • Pull 851 Reimplement the timeout operator and fix timeout bugs
  • Pull 846 Added overloaded createRequest method that takes an HttpContext instance
  • Pull 777 Fixed testSingleSourceManyIterators
  • Pull 852 rxjava-debug
  • Pull 853 StringObservable Update
  • Pull 763 Added support for custom functions in combineLatest.
  • Pull 854 The onCreate hook disappeared
  • Pull 857 Change Lift to use rx.Observable.Operator
  • Pull 859 Add 'Fragment-Host' to rxjava-contrib/debug module for OSGi
  • Pull 860 Fixing the generics for merge and lift
  • Pull 863 Optimize SwingMouseEventSource.fromRelativeMouseMotion
  • Pull 862 Update the timeout docs
  • Pull 790 Convert to scan to use lift
  • Pull 866 Update OperationScan to OperatorScan
  • Pull 870 Add the selector variants of timeout in RxScala
  • Pull 874 Update CompositeSubscriptionTest.java
  • Pull 869 subscribeOn + groupBy
  • Pull 751 Provide Observable.timestamp(Scheduler) to be used in the tests.
  • Pull 878 Scheduler.scheduleRecursive
  • Pull 877 Correct synchronization guard in groupByUntil
  • Pull 880 Force ViewObservable be subscribed and unsubscribed in the UI thread
  • Pull 887 Remove Bad Filter Logic
  • Pull 890 Split SubscribeOn into SubscribeOn/UnsubscribeOn
  • Pull 891 Eliminate rx.util.* dumping grounds
  • Pull 881 Lift Performance
  • Pull 893 Change Parallel to use Long instead of Int
  • Pull 894 Synchronized Operator Check for isTerminated
  • Pull 885 Fixed an issue with the from(Reader) added a bunch of unit tests.
  • Pull 896 removing java 7 dep
  • Pull 883 Make Subscriptions of SwingObservable thread-safe
  • Pull 895 Rewrite OperationObserveFromAndroidComponent to OperatorObserveFromAndroid
  • Pull 892 onErrorFlatMap + OnErrorThrowable
  • Pull 898 Handle illegal errors thrown from plugin
  • Pull 901 GroupBy Unit Test from #900
  • Pull 902 Fixed NullPointerException that may happen on timeout
  • Pull 903 Scheduler.Recurse fields should be private
  • Pull 904 Merge: Unsubscribe Completed Inner Observables
  • Pull 905 RxJavaSchedulers Plugin
  • Pull 909 Scheduler Plugin Refactor
  • Pull 910 Remove groupBy with selector
  • Pull 918 Operator: doOnTerminate
  • Pull 919 BugFix: Zip Never Completes When Zero Observables
  • Pull 920 Delete Deprecated onSubscribeStart That Doesn't Work
  • Pull 922 Changes made while integrating it with our internal system
  • Pull 924 Localized Operator Error Handling
  • Pull 925 Rxjava clojure bindings final
  • Pull 926 TestSubscriber: Default onError and Terminal Latch Behavior
  • Pull 927 TestSubscriber lastSeenThread
  • Pull 936 Skip fixed
  • Pull 942 MathObservable
  • Pull 944 OperationRetry -> OperatorRetry
  • Pull 945 refactor the debug hooks before they become a breaking change.
  • Pull 934 add Observable.startWith(Observable) method and unit test
  • Pull 929 correct link to maven search
  • Pull 923 Observable creation from Subscriber[T]=>Unit for Scala
  • Pull 931 A number of improvements to OperatorObserveFromAndroidComponent
  • Pull 950 Add support for Eclipse PDE

Version 0.16.1 (Maven Central)

  • Pull 730 Improve Error Handling and Stacktraces When Unsubscribe Fails
  • Pull 720 Added Observable.timeout wrappers to scala adapter
  • Pull 731 Fix non-deterministic unit test
  • Pull 742 Build with Gradle 1.10
  • Pull 718 Merge overloads
  • Pull 733 Buffer with Observable boundary
  • Pull 734 Delay with subscription and item delaying observables
  • Pull 735 Window with Observable boundary
  • Pull 736 MergeMap with Iterable and resultSelector overloads
  • Pull 738 Publish and PublishLast overloads
  • Pull 739 Debounce with selector
  • Pull 740 Timeout with selector overloads
  • Pull 745 Fixed switch bug
  • Pull 741 Zip with iterable, removed old aggregator version and updated tests
  • Pull 749 Separated Android test code from source
  • Pull 732 Ported groupByUntil function to scala-adapter

Version 0.16.0 (Maven Central)

This is a significant release with the following changes:

  • Refactor of Subjects and Subscriptions to non-blocking implementations
  • Many bug fixes, new operators and behavior changes to match Rx.Net.
  • Deprecation of some operators due to renaming or eliminating duplicates
  • The rx.concurrency package has been renamed to rx.schedulers. Existing classes still remain in rx.concurrency but are deprecated. Use of rx.concurrency should be migrated to rx.schedulers as these deprecated classes will be removed in a future release.
  • Breaking changes to Scala bindings. See Release Notes for details.
  • New modules: rxjava-string, rxjava-async-util and rxjava-computation-expressions for operators deemed not applicable to the core library.

  • Pull 516 rxjava-string module with StringObservable
  • Pull 533 Operator: ToAsync
  • Pull 535 Fix compilation errors due to referencing the Android support library directly
  • Pull 545 Fixed Zip issue with infinite streams
  • Pull 539 Zipping a finite and an infinite Observable
  • Pull 541 Operator: SkipUntil
  • Pull 537 Add scala adapters for doOnEach operator
  • Pull 560 Add type variances for doOnEach actions
  • Pull 562 Scala Adaptor Improvements
  • Pull 563 Operator: GroupByUntil
  • Pull 561 Revised Approach to Creating Observables in Scala
  • Pull 565 Operator: GroupJoin v2
  • Pull 567 Operator: Timestamp with Scheduler
  • Pull 568 Use lock free strategy for several Subscription implementations
  • Pull 571 Operator: Sample with Observable v2
  • Pull 572 Multiple Subscriptions to ObserveOn
  • Pull 573 Removed Opening and Closing historical artifacts
  • Pull 575 Operator: SequenceEqual reimplementation
  • Pull 587 Operator: LongCount
  • Pull 586 Fix Concat to allow multiple observers
  • Pull 598 New Scala Bindings
  • Pull 596 Fix for buffer not stopping when unsubscribed
  • Pull 576 Operators: Timer and Delay
  • Pull 593 Lock-free subscriptions
  • Pull 599 Refactor rx.concurrency to rx.schedulers
  • Pull 600 BugFix: Replay Subject
  • Pull 594 Operator: Start
  • Pull 604 StringObservable.join
  • Pull 609 Operation: Timer
  • Pull 612 Operation: Replay (overloads)
  • Pull 628 BugFix: MergeDelayError Synchronization
  • Pull 602 BugFix: ObserveOn Subscription leak
  • Pull 631 Make NewThreadScheduler create Daemon threads
  • Pull 651 Subjects Refactor - Non-Blocking, Common Abstraction, Performance
  • Pull 661 Subscriptions Rewrite
  • Pull 520 BugFix: blocking/non-blocking first
  • Pull 621 Scala: SerialSubscription & From
  • Pull 626 BO.Latest, fixed: BO.next, BO.mostRecent, BO.toIterable
  • Pull 633 BugFix: null in toList operator
  • Pull 635 Conditional Operators
  • Pull 638 Operations: DelaySubscription, TakeLast w/ time, TakeLastBuffer
  • Pull 659 Missing fixes from the subject rewrite
  • Pull 688 Fix SafeObserver handling of onComplete errors
  • Pull 690 Fixed Scala bindings
  • Pull 693 Kotlin M6.2
  • Pull 689 Removed ObserverBase
  • Pull 664 Operation: AsObservable
  • Pull 697 Operations: Skip, SkipLast, Take with time
  • Pull 698 Operations: Average, Sum
  • Pull 699 Operation: Repeat
  • Pull 701 Operation: Collect
  • Pull 707 Module: rxjava-async-util
  • Pull 708 BugFix: combineLatest
  • Pull 712 Fix Scheduler Memory Leaks
  • Pull 714 Module: rxjava-computation-expressions
  • Pull 715 Add missing type hint to clojure example
  • Pull 717 Scala: Added ConnectableObservable
  • Pull 723 Deprecate multiple arity ‘from’
  • Pull 724 Revert use of CurrentThreadScheduler for Observable.from
  • Pull 725 Simpler computation/io naming for Schedulers
  • Pull 727 ImmediateScheduler optimization for toObservableIterable

Version 0.15.1 (Maven Central)

This release should be additive functionality and bug fixes.

Version 0.15.0 (Maven Central)

This release contains a refactor of the Scala Bindings by @headinthebox that results in some breaking changes. The previous solution ended up not working well in all cases for idiomatic Scala usage. Java/Scala interop has been changed and is no longer transparent so as to optimize for native Scala usage. Read the rxjava-scala README for more information.

  • Pull 503 New Scala Bindings
  • Pull 502 Fix ObserveOn and add ParallelMerge Scheduler overload
  • Pull 499 ObserveOn Refactor
  • Pull 492 Implement the scheduler overloads for Range, From, StartWith
  • Pull 496 Add contravariant for min and max

Version 0.14.11 (Maven Central)

  • Pull 486 BugFix: AsyncSubject
  • Pull 483 Tweaks to DoOnEach and added DoOnError/DoOnCompleted

This has a very slight breaking change by removing one doOnEach overload. The version was not bumped from 0.14 to 0.15 as it is so minor and the offending method was just released in the previous version.

Version 0.14.10 (Maven Central)

  • Pull 481 Operator: Using
  • Pull 480 BugFix: Emit an IllegalArgumentException instead of ArithmeticException if the observable is empty
  • Pull 479 Operator: DoOnEach
  • Pull 478 Operator: Min, MinBy, Max, MaxBy
  • Pull 463 Add Timeout Overloads

Version 0.14.9 (Maven Central)

  • Pull 477 BugFix: CompositeSubscription
  • Pull 476 BugFix: Don't emit null onComplete when no onNext received in AsyncSubject
  • Pull 474 BugFix: Reduce an empty observable
  • Pull 474 BugFix: non-deterministic unit test
  • Pull 472 BugFix: Issue 431 Unsubscribe with Schedulers.newThread
  • Pull 470 Operator: Last

Version 0.14.8 (Maven Central)

Version 0.14.7 (Maven Central)

  • Pull 459 Fix multiple unsubscribe behavior
  • Pull 458 rxjava-android: OperationObserveFromAndroidComponent
  • Pull 453 Fix error handling in map operator
  • Pull 450 Operator: TimeInterval
  • Pull 452 Scheduler Overload of Just/Return Operator
  • Pull 433 Fixes: Next Operator
  • Commit d64a8c5 Update rxjava-apache-http to Apache HttpAsyncClient 4.0 GA

Version 0.14.6 (Maven Central)

  • Pull 441 Fixed the issue that 'take' does not call 'onError'
  • Pull 443 OperationSwitch notify onComplete() too early.
  • Pull 434 Timeout operator and SerialSubscription
  • Pull 447 Caching the result of 'isInternalImplementation'

Version 0.14.5 (Maven Central)

Version 0.14.4 (Maven Central)

  • Issue 428 Fix: buffer() using TimeAndSizeBasedChunks incorrectly forces thread into interrupted state
  • Pull 435 rx-apache-http recognizes "Transfer-Encoding: chunked" as an HTTP stream
  • Pull 437 Fixes: Scheduler and Merge

Version 0.14.3 (Maven Central)

  • Pull 407 Operator: RefCount
  • Pull 410 Operator: Contains
  • Pull 411 Unit Test fix: update counter before triggering latch
  • Pull 413 Fixed the issues of takeLast(items, 0) and null values
  • Pull 414 Operator: SkipLast
  • Pull 415 Operator: Empty with scheduler
  • Pull 416 Operator: Throw with scheduler
  • Pull 420 Scala Adaptor Improvements
  • Pull 422 JRuby function wrapping support
  • Pull 424 Operator: IgnoreElements
  • Pull 426 PublishSubject ReSubscribe for publish().refCount() Behavior

Version 0.14.2 (Maven Central)

Version 0.14.1 (Maven Central)

  • Pull 402 rxjava-apache-http improvements

Version 0.14.0 (Maven Central)

Further progress to the Scala adaptor and a handful of new operators.

Bump to 0.14.0 due to small breaking change to distinct operator removing overloaded methods with Comparator. These methods were added in 0.13.2 and determined to be incorrect.

This release also includes a new contrib module, rxjava-apache-http that provides an Observable API to the Apache HttpAsyncClient.

  • Pull 396 Add missing methods to Scala Adaptor
  • Pull 390 Operators: ElementAt and ElementAtOrDefault
  • Pull 398 Operators: IsEmpty and Exists (instead of Any)
  • Pull 397 Observable API for Apache HttpAsyncClient 4.0
  • Pull 400 Removing comparator overloads of distinct

Version 0.13.5

  • Upload to Sonatype failed so version skipped

Version 0.13.4 (Maven Central)

  • Pull 393 Parallel Operator & ObserveOn/ScheduledObserver Fixes
  • Pull 394 Change Interval and Sample default Scheduler
  • Pull 391 Fix OSGI support for rxjava-scala

Version 0.13.3

  • Upload to Sonatype failed so version skipped

Version 0.13.2 (Maven Central)

  • Pull 389 Scala Adaptor Improvements
  • Pull 382 Removing deprecated RxImplicits from rxjava-scala
  • Pull 381 Operator: mapWithIndex
  • Pull 380 Implemented distinct and distinctUntilChanged variants using a comparator
  • Pull 379 Make interval work with multiple subscribers

Version 0.13.1 (Maven Central)

This release includes a new Scala adaptor as part of the effort from issue ReactiveX#336 pursuing idiomatic Scala support.

Version 0.13.0 (Maven Central)

This release has some minor changes related to varargs that could break backwards compatibility if directly passing arrays but for most this release should not be breaking.

  • Pull 354 Operators: Count, Sum, Average
  • Pull 355 Operators: skipWhile and skipWhileWithIndex
  • Pull 356 Operator: Interval
  • Pull 357 Operators: first and firstOrDefault
  • Pull 368 Operators: Throttle and Debounce
  • Pull 371 Operator: Retry
  • Pull 370 Change zip method signature from Collection to Iterable
  • Pull 369 Generics Improvements: co/contra-variance
  • Pull 361 Remove use of varargs from API

Version 0.12.2 (Maven Central)

  • Pull 352 Groovy Language Adaptor: Add Func5-9 and N to the wrapper

Version 0.12.1 (Maven Central)

  • Pull 350 Swing module enhancements
  • Pull 351 Fix Observable.window static/instance bug

Version 0.12.0 (Maven Central)

This version adds to the static typing changes in 0.11 and adds covariant/contravariant typing via super/extends generics.

Additional cleanup was done, particularly related to BlockingObservable. Also the window operator was added.

The largest breaking change is that Observable.create now accepts an OnSubscribeFunc rather than a Func1.

This means that instead of this:

public static <T> Observable<T> create(Func1<? super Observer<? super T>, ? extends Subscription> func)

it is now:

public static <T> Observable<T> create(OnSubscribeFunc<T> func)

This was done to simplify the usage of Observable.create which was already verbose but made far worse by the ? super generics.

For example, instead of writing this:

Observable.create(new Func1<Observer<? super SomeType>, Subscription>() {
   /// body here
}

it is now written as:

Observable.create(new OnSubscribeFunc<SomeType>() {
   /// body here
}
  • Pull 343 Covariant Support with super/extends and OnSubscribeFunc as type for Observable.create
  • Pull 337 Operator: window
  • Pull 348 Rename switchDo to switchOnNext (deprecate switchDo for eventual deletion)
  • Pull 348 Delete switchDo instance method in preference for static
  • Pull 346 Remove duplicate static methods from BlockingObservable
  • Pull 346 BlockingObservable no longer extends from Observable
  • Pull 345 Remove unnecessary constructor from Observable

Version 0.11.2 (Maven Central)

Version 0.11.1 (Maven Central)

  • Pull 325 Clojure: Preserve metadata on fn and action macros

Version 0.11.0 (Maven Central)

This is a major refactor of rxjava-core and the language adaptors.

Note that there are breaking changes in this release. Details are below.

After this refactor it is expected that the API will settle down and allow us to stabilize towards a 1.0 release.

  • Pull 332 Refactor Core to be Statically Typed

RxJava was written from the beginning to target the JVM, not any specific language.

As a side-effect of Java not having lambdas/clojures yet (and other considerations), Netflix used dynamic languages with it predominantly for the year of its existence prior to open sourcing.

To bridge the rxjava-core written in Java with the various languages a FunctionalLanguageAdaptor was registered at runtime for each language of interest.

To enable these language adaptors methods are overloaded with Object in the API since Object is the only super-type that works across all languages for their various implementations of lambdas and closures.

This downside of this has been that it breaks static typing for Java, Scala and other statically-typed languages. More can be read on this issue and discussion of the subject here: https://groups.google.com/forum/#!topic/rxjava/bVZoKSsb1-o

This release:

  • removes all Object overload methods from rxjava-core so it is statically typed
  • removes dynamic FunctionalLanguageAdaptors
  • uses idiomatic approaches for each language adaptor
    • Java core is statically typed and has no knowledge of other languages
    • Scala uses implicits
    • Groovy uses an ExtensionModule
    • Clojure adds a new macro (NOTE: this requires code changes)
    • JRuby has been temporarily disabled (discussing new implementation at ReactiveX#320)
  • language supports continue to be additive
    • the rxjava-core will always be required and then whichever language modules are desired such as rxjava-scala, rxjava-clojure, rxjava-groovy are added to the classpath
  • deletes deprecated methods
  • deletes redundant static methods on Observable that cluttered the API and in some cases caused dynamic languages trouble choosing which method to invoke
  • deletes redundant methods on Scheduler that gave dynamic languages a hard time choosing which method to invoke

The benefits of this are:

  1. Everything is statically typed so compile-time checks for Java, Scala, etc work correctly
  2. Method dispatch is now done via native Java bytecode using types rather than going via Object which then has to do a lookup in a map. Memoization helped with the performance but each method invocation still required looking in a map for the correct adaptor. With this approach the appropriate methods will be compiled into the rx.Observable class to correctly invoke the right adaptor without lookups.
  3. Interaction from each language should work as expected idiomatically for that language.

Further history on the various discussions and different attempts at solutions can be seen at ReactiveX#304, ReactiveX#204 and ReactiveX#208

Version 0.10.1 (Maven Central)

A new contrib module for Android: https://github.com/ReactiveX/RxJava/tree/master/rxjava-contrib/rxjava-android

  • Pull 318 rxjava-android module with Android Schedulers

Version 0.10.0 (Maven Central)

This release includes a breaking change as it changes onError(Exception) to onError(Throwable). This decision was made via discussion at ReactiveX#296.

Any statically-typed Observer implementations with onError(Exception) will need to be updated to onError(Throwable) when moving to this version.

  • Pull 312 Fix for OperatorOnErrorResumeNextViaObservable and async Resume
  • Pull 314 Map Error Handling
  • Pull 315 Change onError(Exception) to onError(Throwable) - Issue #296

Version 0.9.2 (Maven Central)

  • Pull 308 Ensure now() is always updated in TestScheduler.advanceTo/By
  • Pull 281 Operator: Buffer

Version 0.9.1 (Maven Central)

Version 0.9.0 (Maven Central)

This release includes breaking changes that move all blocking operators (such as single, last, forEach) to BlockingObservable.

This means Observable has only non-blocking operators on it. The blocking operators can now be accessed via .toBlockingObservable() or BlockingObservable.from(observable).

Notes and link to the discussion of this change can be found at ReactiveX#272.

  • Pull 272 Move blocking operators into BlockingObservable
  • Pull 273 Fix Concat (make non-blocking)
  • Issue 13 Operator: Switch
  • Pull 274 Remove SLF4J dependency (RxJava is now a single jar with no dependencies)

Version 0.8.4 (Maven Central)

  • Pull 269 (Really) Fix concurrency bug in ScheduledObserver

Version 0.8.3 (Maven Central)

  • Pull 268 Fix concurrency bug in ScheduledObserver

Version 0.8.2 (Maven Central)

  • Issue 74 Operator: Sample
  • Issue 93 Operator: Timestamp
  • Pull 253 Fix multiple subscription bug on operation filter
  • Pull 254 SwingScheduler (new rxjava-swing module)
  • Pull 256 BehaviorSubject
  • Pull 257 Improved scan, reduce, aggregate
  • Pull 262 SwingObservable (new rxjava-swing module)
  • Pull 264 Publish, Replay and Cache Operators

Version 0.8.1 (Maven Central)

Version 0.8.0 (Maven Central)

This is a breaking (non-backwards compatible) release that updates the Scheduler implementation released in 0.7.0.

See ReactiveX#19 for background, discussion and status of Schedulers.

It is believed that the public signatures of Scheduler and related objects is now stabilized but ongoing feedback and review by the community could still result in changes.

Version 0.7.0 (Maven Central)

This release adds the foundations of Rx Schedulers.

There are still open questions, portions not implemented and assuredly bugs and behavior we didn't understand and thus implemented wrong.

Please provide bug reports, pull requests or feedback to help us on the road to version 1.0 and get schedulers implemented correctly.

See ReactiveX#19 (comment) for some known open questions that we could use help answering.

Version 0.6.3 (Maven Central)

  • Pull 224 RxJavaObservableExecutionHook

Version 0.6.2 (Maven Central)

  • Issue 101 Operator: Where (alias to filter)
  • Pull 197 TakeWhile observables do not properly complete
  • Issue 21 Operator: All
  • Pull 206 Observable.toList breaks with multiple subscribers
  • Issue 29 Operator: CombineLatest
  • Issue 211 Remove use of JSR 305 and dependency on com.google.code.findbugs
  • Pull 212 Operation take leaks errors
  • Pull 220 TakeWhile protect calls to predicate
  • Pull 221 Error Handling Improvements - User Provided Observers/Functions
  • Pull 201 Synchronize Observer on OperationMerge
  • Issue 43 Operator: Finally

Version 0.6.1 (Maven Central)

  • Pull 190 Fix generics issue with materialize() that prevented chaining

Version 0.6.0 (Maven Central)

  • Issue 154 Add OSGi manifest headers
  • Issue 173 Subscription Utilities and Default Implementations
  • Pull 184 Convert 'last' from non-blocking to blocking to match Rx.Net (see Issue 57)

NOTE: This is a version bump from 0.5 to 0.6 because Issue 173 and Pull 184 include breaking changes.

These changes are being done in the goal of matching the Rx.Net implementation so breaking changes will be made prior to 1.0 on 0.x releases if necessary.

It was found that the last() operator was implemented incorrectly (non-blocking instead of blocking) so any use of last() on version 0.5.x should be changed to use takeLast(1). Since the return type needed to change this could not be done via a deprecation.

Also removed were the Observable.createSubscription/Observable.noOpSubscription methods which are now on the rx.subscriptions.Subscriptions utility class as Subscriptions.create/Subscriptions.empty. These methods could have been deprecated rather than removed but since another breaking change was being done they were just cleanly changed as part of the pre-1.0 process.

Version 0.5.5 (Maven Central)

Version 0.5.4 (Maven Central)

Version 0.5.3 (Maven Central)

Version 0.5.2 (Maven Central)

Version 0.5.1 (Maven Central)

  • variety of code cleanup commits
  • Pull 132 Broke rxjava-examples mo