Skip to content

Commit

Permalink
2.x: reflection-based parameter validator & fixes (ReactiveX#5187)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 15, 2017
1 parent 2d03fa9 commit b58642b
Show file tree
Hide file tree
Showing 10 changed files with 1,374 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ public final void blockingAwait() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final boolean blockingAwait(long timeout, TimeUnit unit) {
ObjectHelper.requireNonNull(unit, "unit is null");
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
subscribe(observer);
return observer.blockingAwait(timeout, unit);
Expand Down
73 changes: 64 additions & 9 deletions src/main/java/io/reactivex/Flowable.java

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ public static <T> Flowable<T> merge(Publisher<? extends MaybeSource<? extends T>
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> merge(Publisher<? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
ObjectHelper.requireNonNull(sources, "source is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, Flowable.bufferSize()));
}

Expand All @@ -852,6 +854,7 @@ public static <T> Flowable<T> merge(Publisher<? extends MaybeSource<? extends T>
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Maybe<T> merge(MaybeSource<? extends MaybeSource<? extends T>> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new MaybeFlatten(source, Functions.identity()));
}

Expand Down Expand Up @@ -1028,6 +1031,9 @@ public static <T> Flowable<T> mergeArray(MaybeSource<? extends T>... sources) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> mergeArrayDelayError(MaybeSource<? extends T>... sources) {
if (sources.length == 0) {
return Flowable.empty();
}
return Flowable.fromArray(sources).flatMap((Function)MaybeToPublisher.instance(), true, sources.length);
}

Expand Down Expand Up @@ -1309,6 +1315,9 @@ public static <T> Single<Boolean> sequenceEqual(MaybeSource<? extends T> source1
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<Boolean> sequenceEqual(MaybeSource<? extends T> source1, MaybeSource<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(isEqual, "isEqual is null");
return RxJavaPlugins.onAssembly(new MaybeEqualSingle<T>(source1, source2, isEqual));
}

Expand Down Expand Up @@ -2281,6 +2290,7 @@ public final Maybe<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
public final <U, V> Maybe<T> delay(Publisher<U> delayIndicator) {
ObjectHelper.requireNonNull(delayIndicator, "delayIndicator is null");
return RxJavaPlugins.onAssembly(new MaybeDelayOtherPublisher<T, U>(this, delayIndicator));
}

Expand Down Expand Up @@ -2677,6 +2687,8 @@ public final <R> Maybe<R> flatMap(
@SchedulerSupport(SchedulerSupport.NONE)
public final <U, R> Maybe<R> flatMap(Function<? super T, ? extends MaybeSource<? extends U>> mapper,
BiFunction<? super T, ? super U, ? extends R> resultSelector) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.requireNonNull(resultSelector, "resultSelector is null");
return RxJavaPlugins.onAssembly(new MaybeFlatMapBiSelector<T, U, R>(this, mapper, resultSelector));
}

Expand Down Expand Up @@ -2704,6 +2716,7 @@ public final <U, R> Maybe<R> flatMap(Function<? super T, ? extends MaybeSource<?
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Flowable<U> flattenAsFlowable(final Function<? super T, ? extends Iterable<? extends U>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlatMapIterableFlowable<T, U>(this, mapper));
}

Expand All @@ -2727,6 +2740,7 @@ public final <U> Flowable<U> flattenAsFlowable(final Function<? super T, ? exten
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> flattenAsObservable(final Function<? super T, ? extends Iterable<? extends U>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlatMapIterableObservable<T, U>(this, mapper));
}

Expand Down Expand Up @@ -3691,6 +3705,9 @@ public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? supe
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError,
Action onComplete) {
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
return subscribeWith(new MaybeCallbackObserver<T>(onSuccess, onError, onComplete));
}

Expand Down
Loading

0 comments on commit b58642b

Please sign in to comment.