Skip to content

Commit

Permalink
Convert to scan to use lift
Browse files Browse the repository at this point in the history
  • Loading branch information
abersnaze committed Feb 12, 2014
1 parent 4bb59a5 commit 6737419
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 385 deletions.
26 changes: 9 additions & 17 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2529,14 +2529,6 @@ public final static <T> Observable<Boolean> sequenceEqual(Observable<? extends T
return OperationSequenceEqual.sequenceEqual(first, second, equality);
}

/**
* @deprecated use {@link #sumInteger}
*/
@Deprecated
public final static Observable<Integer> sum(Observable<Integer> source) {
return OperationSum.sum(source);
}

/**
* Returns an Observable that emits the sum of all the Doubles emitted by the source Observable.
* <p>
Expand Down Expand Up @@ -2583,7 +2575,7 @@ public final static Observable<Float> sumFloat(Observable<Float> source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final static Observable<Integer> sumInteger(Observable<Integer> source) {
return OperationSum.sum(source);
return OperationSum.sumIntegers(source);
}

/**
Expand Down Expand Up @@ -5519,7 +5511,7 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is
* empty.
*/
return create(OperationScan.scan(this, accumulator)).last();
return scan(accumulator).last();
}

/**
Expand Down Expand Up @@ -5547,7 +5539,7 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
return scan(initialValue, accumulator).takeLast(1);
}

/**
Expand Down Expand Up @@ -6148,7 +6140,7 @@ public final <U> Observable<T> sample(Observable<U> sampler) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
*/
public final Observable<T> scan(Func2<T, T, T> accumulator) {
return create(OperationScan.scan(this, accumulator));
return lift(OperationScan.scan(accumulator));
}

/**
Expand All @@ -6175,7 +6167,7 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
*/
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return create(OperationScan.scan(this, initialValue, accumulator));
return lift(OperationScan.scan(initialValue, accumulator));
}

/**
Expand Down Expand Up @@ -7079,7 +7071,7 @@ public final Observable<T> subscribeOn(Scheduler scheduler) {
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final Observable<Double> sumDouble(Func1<? super T, Double> valueExtractor) {
return create(new OperationSum.SumDoubleExtractor<T>(this, valueExtractor));
return OperationSum.sumAtLeastOneDoubles(map(valueExtractor));
}

/**
Expand All @@ -7096,7 +7088,7 @@ public final Observable<Double> sumDouble(Func1<? super T, Double> valueExtracto
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor) {
return create(new OperationSum.SumFloatExtractor<T>(this, valueExtractor));
return OperationSum.sumAtLeastOneFloats(map(valueExtractor));
}

/**
Expand All @@ -7113,7 +7105,7 @@ public final Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor)
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtractor) {
return create(new OperationSum.SumIntegerExtractor<T>(this, valueExtractor));
return OperationSum.sumAtLeastOneIntegers(map(valueExtractor));
}

/**
Expand All @@ -7130,7 +7122,7 @@ public final Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtra
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum.aspx">MSDN: Observable.Sum</a>
*/
public final Observable<Long> sumLong(Func1<? super T, Long> valueExtractor) {
return create(new OperationSum.SumLongExtractor<T>(this, valueExtractor));
return OperationSum.sumAtLeastOneLongs(map(valueExtractor));
}

/**
Expand Down
188 changes: 80 additions & 108 deletions rxjava-core/src/main/java/rx/operators/OperationScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
package rx.operators;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.util.functions.Func2;

/**
Expand All @@ -36,7 +34,8 @@
*/
public final class OperationScan {
/**
* Applies an accumulator function over an observable sequence and returns each intermediate result with the specified source and accumulator.
* Applies an accumulator function over an observable sequence and returns each intermediate
* result with the specified source and accumulator.
*
* @param sequence
* An observable sequence of elements to project.
Expand All @@ -45,124 +44,97 @@ public final class OperationScan {
* @param accumulator
* An accumulator function to be invoked on each element from the sequence.
*
* @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
* @return An observable sequence whose elements are the result of accumulating the output from
* the list of Observables.
* @see <a
* href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource,
* TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
* TAccumulate))</a>
*/
public static <T, R> OnSubscribeFunc<R> scan(Observable<? extends T> sequence, R initialValue, Func2<R, ? super T, R> accumulator) {
return new Accumulator<T, R>(sequence, initialValue, accumulator);
public static <T, R> Operator<R, T> scan(final R initialValue, final Func2<R, ? super T, R> accumulator) {
return new Operator<R, T>() {
@Override
public Subscriber<T> call(final Subscriber<? super R> observer) {
observer.onNext(initialValue);
return new Subscriber<T>(observer) {
private R value = initialValue;

@Override
public void onNext(T value) {
try {
this.value = accumulator.call(this.value, value);
} catch (Throwable e) {
observer.onError(e);
observer.unsubscribe();
}
observer.onNext(this.value);
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
};
}

/**
* Applies an accumulator function over an observable sequence and returns each intermediate result with the specified source and accumulator.
* Applies an accumulator function over an observable sequence and returns each intermediate
* result with the specified source and accumulator.
*
* @param sequence
* An observable sequence of elements to project.
* @param accumulator
* An accumulator function to be invoked on each element from the sequence.
*
* @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
* @return An observable sequence whose elements are the result of accumulating the output from
* the list of Observables.
* @see <a
* href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource)
* Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
*/
public static <T> OnSubscribeFunc<T> scan(Observable<? extends T> sequence, Func2<T, T, T> accumulator) {
return new AccuWithoutInitialValue<T>(sequence, accumulator);
}

private static class AccuWithoutInitialValue<T> implements OnSubscribeFunc<T> {
private final Observable<? extends T> sequence;
private final Func2<T, T, T> accumulatorFunction;

private AccumulatingObserver<T, T> accumulatingObserver;

private AccuWithoutInitialValue(Observable<? extends T> sequence, Func2<T, T, T> accumulator) {
this.sequence = sequence;
this.accumulatorFunction = accumulator;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
return sequence.subscribe(new Observer<T>() {

// has to be synchronized so that the initial value is always sent only once.
@Override
public synchronized void onNext(T value) {
if (accumulatingObserver == null) {
observer.onNext(value);
accumulatingObserver = new AccumulatingObserver<T, T>(observer, value, accumulatorFunction);
} else {
accumulatingObserver.onNext(value);
public static <T> Operator<T, T> scan(final Func2<T, T, T> accumulator) {
return new Operator<T, T>() {
@Override
public Subscriber<T> call(final Subscriber<? super T> observer) {
return new Subscriber<T>(observer) {
private boolean first = true;
private T value;

@Override
public void onNext(T value) {
if (first) {
this.value = value;
first = false;
}
else {
try {
this.value = accumulator.call(this.value, value);
} catch (Throwable e) {
observer.onError(e);
observer.unsubscribe();
}
}
observer.onNext(this.value);
}
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onCompleted() {
observer.onCompleted();
}
});
}
}

private static class Accumulator<T, R> implements OnSubscribeFunc<R> {
private final Observable<? extends T> sequence;
private final R initialValue;
private final Func2<R, ? super T, R> accumulatorFunction;

private Accumulator(Observable<? extends T> sequence, R initialValue, Func2<R, ? super T, R> accumulator) {
this.sequence = sequence;
this.initialValue = initialValue;
this.accumulatorFunction = accumulator;
}

@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
observer.onNext(initialValue);
return sequence.subscribe(new AccumulatingObserver<T, R>(observer, initialValue, accumulatorFunction));
}
}

private static class AccumulatingObserver<T, R> implements Observer<T> {
private final Observer<? super R> observer;
private final Func2<R, ? super T, R> accumulatorFunction;

private R acc;

private AccumulatingObserver(Observer<? super R> observer, R initialValue, Func2<R, ? super T, R> accumulator) {
this.observer = observer;
this.accumulatorFunction = accumulator;

this.acc = initialValue;
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}

/**
* We must synchronize this because we can't allow
* multiple threads to execute the 'accumulatorFunction' at the same time because
* the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap
*
* Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded
*/
@Override
public synchronized void onNext(T value) {
try {
acc = accumulatorFunction.call(acc, value);
observer.onNext(acc);
} catch (Throwable ex) {
observer.onError(ex);
@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
}
Loading

0 comments on commit 6737419

Please sign in to comment.