Skip to content

Commit

Permalink
Merge pull request ReactiveX#2627 from akarnokd/FlatMapMaxConcurrent
Browse files Browse the repository at this point in the history
FlatMap overloads with maximum concurrency parameter
  • Loading branch information
benjchristensen committed Feb 11, 2015
2 parents f85b5cd + 0dadcde commit 13001d0
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 11 deletions.
94 changes: 93 additions & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4516,7 +4516,34 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return merge(map(func));
}


/**
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
* by the source Observable, where that function returns an Observable, and then merging those resulting
* Observables and emitting the results of this merger, while limiting the maximum number of concurrent
* subscriptions to these Observables.
* <p>
* <!-- <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png" alt=""> -->
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param func
* a function that, when applied to an item emitted by the source Observable, returns an
* Observable
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits the result of applying the transformation function to each item emitted
* by the source Observable and merging the results of the Observables obtained from this
* transformation
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@Beta
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
return merge(map(func), maxConcurrent);
}

/**
* Returns an Observable that applies a function to each item emitted or notification raised by the source
* Observable and then flattens the Observables returned from these functions and emits the resulting items.
Expand Down Expand Up @@ -4547,6 +4574,40 @@ public final <R> Observable<R> flatMap(
Func0<? extends Observable<? extends R>> onCompleted) {
return merge(mapNotification(onNext, onError, onCompleted));
}
/**
* Returns an Observable that applies a function to each item emitted or notification raised by the source
* Observable and then flattens the Observables returned from these functions and emits the resulting items,
* while limiting the maximum number of concurrent subscriptions to these Observables.
* <p>
* <!-- <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.nce.png" alt=""> -->
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the result type
* @param onNext
* a function that returns an Observable to merge for each item emitted by the source Observable
* @param onError
* a function that returns an Observable to merge for an onError notification from the source
* Observable
* @param onCompleted
* a function that returns an Observable to merge for an onCompleted notification from the source
* Observable
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits the results of merging the Observables returned from applying the
* specified functions to the emissions and notifications of the source Observable
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@Beta
public final <R> Observable<R> flatMap(
Func1<? super T, ? extends Observable<? extends R>> onNext,
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
Func0<? extends Observable<? extends R>> onCompleted, int maxConcurrent) {
return merge(mapNotification(onNext, onError, onCompleted), maxConcurrent);
}

/**
* Returns an Observable that emits the results of a specified function to the pair of values emitted by the
Expand Down Expand Up @@ -4575,6 +4636,37 @@ public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Obser
final Func2<? super T, ? super U, ? extends R> resultSelector) {
return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)));
}
/**
* Returns an Observable that emits the results of a specified function to the pair of values emitted by the
* source Observable and a specified collection Observable, while limiting the maximum number of concurrent
* subscriptions to these Observables.
* <p>
* <!-- <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.r.png" alt=""> -->
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the type of items emitted by the collection Observable
* @param <R>
* the type of items emitted by the resulting Observable
* @param collectionSelector
* a function that returns an Observable for each item emitted by the source Observable
* @param resultSelector
* a function that combines one item emitted by each of the source and collection Observables and
* returns an item to be emitted by the resulting Observable
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits the results of applying a function to a pair of values emitted by the
* source Observable and the collection Observable
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@Beta
public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
final Func2<? super T, ? super U, ? extends R> resultSelector, int maxConcurrent) {
return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)), maxConcurrent);
}

/**
* Returns an Observable that merges each item emitted by the source Observable with the values in an
Expand Down
124 changes: 114 additions & 10 deletions src/test/java/rx/internal/operators/OperatorFlatMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,20 @@
package rx.internal.operators;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.junit.*;

import rx.Observable;
import rx.Observer;
import rx.exceptions.TestException;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OperatorFlatMapTest {
@Test
Expand Down Expand Up @@ -312,4 +310,110 @@ public void testFlatMapTransformsMergeException() {
verify(o, never()).onNext(any());
verify(o, never()).onCompleted();
}

private static <T> Observable<T> compose(Observable<T> source, final AtomicInteger subscriptionCount, final int m) {
return source.doOnSubscribe(new Action0() {
@Override
public void call() {
if (subscriptionCount.getAndIncrement() >= m) {
Assert.fail("Too many subscriptions! " + subscriptionCount.get());
}
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
if (subscriptionCount.decrementAndGet() < 0) {
Assert.fail("Too many unsubscriptionss! " + subscriptionCount.get());
}
}
});
}

@Test
public void testFlatMapMaxConcurrent() {
final int m = 4;
final AtomicInteger subscriptionCount = new AtomicInteger();
Observable<Integer> source = Observable.range(1, 10).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
return compose(Observable.range(t1 * 10, 2), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
}
}, m);

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
Set<Integer> expected = new HashSet<Integer>(Arrays.asList(
10, 11, 20, 21, 30, 31, 40, 41, 50, 51, 60, 61, 70, 71, 80, 81, 90, 91, 100, 101
));
Assert.assertEquals(expected.size(), ts.getOnNextEvents().size());
Assert.assertTrue(expected.containsAll(ts.getOnNextEvents()));
}
@Test
public void testFlatMapSelectorMaxConcurrent() {
final int m = 4;
final AtomicInteger subscriptionCount = new AtomicInteger();
Observable<Integer> source = Observable.range(1, 10).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
return compose(Observable.range(t1 * 10, 2), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
}
}, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer t1, Integer t2) {
return t1 * 1000 + t2;
}
}, m);

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
Set<Integer> expected = new HashSet<Integer>(Arrays.asList(
1010, 1011, 2020, 2021, 3030, 3031, 4040, 4041, 5050, 5051,
6060, 6061, 7070, 7071, 8080, 8081, 9090, 9091, 10100, 10101
));
Assert.assertEquals(expected.size(), ts.getOnNextEvents().size());
System.out.println("--> testFlatMapSelectorMaxConcurrent: " + ts.getOnNextEvents());
Assert.assertTrue(expected.containsAll(ts.getOnNextEvents()));
}
@Test
public void testFlatMapTransformsMaxConcurrentNormal() {
final int m = 2;
final AtomicInteger subscriptionCount = new AtomicInteger();
Observable<Integer> onNext =
compose(Observable.from(Arrays.asList(1, 2, 3)).observeOn(Schedulers.computation()), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
Observable<Integer> onCompleted = compose(Observable.from(Arrays.asList(4)), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
Observable<Integer> onError = Observable.from(Arrays.asList(5));

Observable<Integer> source = Observable.from(Arrays.asList(10, 20, 30));

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
TestSubscriber<Object> ts = new TestSubscriber<Object>(o);

source.flatMap(just(onNext), just(onError), just0(onCompleted), m).subscribe(ts);

ts.awaitTerminalEvent(1, TimeUnit.SECONDS);
ts.assertNoErrors();
ts.assertTerminalEvent();

verify(o, times(3)).onNext(1);
verify(o, times(3)).onNext(2);
verify(o, times(3)).onNext(3);
verify(o).onNext(4);
verify(o).onCompleted();

verify(o, never()).onNext(5);
verify(o, never()).onError(any(Throwable.class));
}
}

0 comments on commit 13001d0

Please sign in to comment.