Skip to content

Commit

Permalink
Merge pull request ReactiveX#1010 from benjchristensen/unsafe-subscribe
Browse files Browse the repository at this point in the history
UnsafeSubscribe
  • Loading branch information
benjchristensen committed Apr 3, 2014
2 parents 7af5c32 + 7ce6860 commit c09efd8
Show file tree
Hide file tree
Showing 85 changed files with 604 additions and 687 deletions.
553 changes: 211 additions & 342 deletions rxjava-core/src/main/java/rx/Observable.java

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/joins/JoinObserver1.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void addActivePlan(ActivePlan0 activePlan) {
public void subscribe(Object gate) {
if (subscribed.compareAndSet(false, true)) {
this.gate = gate;
source.materialize().subscribe(this);
source.materialize().unsafeSubscribe(this);
} else {
throw new IllegalStateException("Can only be subscribed to once.");
}
Expand Down
41 changes: 13 additions & 28 deletions rxjava-core/src/main/java/rx/observables/BlockingObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.SafeSubscriber;
import rx.operators.OperationLatest;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationToFuture;
import rx.operators.OperationToIterator;
import rx.operators.BlockingOperatorLatest;
import rx.operators.BlockingOperatorMostRecent;
import rx.operators.BlockingOperatorNext;
import rx.operators.BlockingOperatorToFuture;
import rx.operators.BlockingOperatorToIterator;

/**
* An extension of {@link Observable} that provides blocking operators.
Expand Down Expand Up @@ -64,17 +62,6 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
return new BlockingObservable<T>(o);
}

/**
* Used for protecting against errors being thrown from {@link Subscriber} implementations and ensuring onNext/onError/onCompleted contract
* compliance.
* <p>
* See https://github.com/Netflix/RxJava/issues/216 for discussion on
* "Guideline 6.4: Protect calls to user code from within an operator"
*/
private Subscription protectivelyWrapAndSubscribe(Subscriber<? super T> observer) {
return o.subscribe(new SafeSubscriber<T>(observer));
}

/**
* Invoke a method on each item emitted by the {@link Observable}; block
* until the Observable completes.
Expand All @@ -97,12 +84,10 @@ public void forEach(final Action1<? super T> onNext) {
final AtomicReference<Throwable> exceptionFromOnError = new AtomicReference<Throwable>();

/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on
* "Guideline 6.4: Protect calls to user code from within an operator"
* Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior
* as this is the final subscribe in the chain.
*/
protectivelyWrapAndSubscribe(new Subscriber<T>() {
o.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
Expand Down Expand Up @@ -158,7 +143,7 @@ public void onNext(T args) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki: getIterator()</a>
*/
public Iterator<T> getIterator() {
return OperationToIterator.toIterator(o);
return BlockingOperatorToIterator.toIterator(o);
}

/**
Expand Down Expand Up @@ -311,7 +296,7 @@ public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229751.aspx">MSDN: Observable.MostRecent</a>
*/
public Iterable<T> mostRecent(T initialValue) {
return OperationMostRecent.mostRecent(o, initialValue);
return BlockingOperatorMostRecent.mostRecent(o, initialValue);
}

/**
Expand All @@ -324,7 +309,7 @@ public Iterable<T> mostRecent(T initialValue) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211897.aspx">MSDN: Observable.Next</a>
*/
public Iterable<T> next() {
return OperationNext.next(o);
return BlockingOperatorNext.next(o);
}

/**
Expand All @@ -344,7 +329,7 @@ public Iterable<T> next() {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212115.aspx">MSDN: Observable.Latest</a>
*/
public Iterable<T> latest() {
return OperationLatest.latest(o);
return BlockingOperatorLatest.latest(o);
}

/**
Expand Down Expand Up @@ -441,7 +426,7 @@ public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki: toFuture()</a>
*/
public Future<T> toFuture() {
return OperationToFuture.toFuture(o);
return BlockingOperatorToFuture.toFuture(o);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static <K, T> GroupedObservable<K, T> from(K key, final Observable<T> o)

@Override
public void call(Subscriber<? super T> s) {
o.subscribe(s);
o.unsafeSubscribe(s);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/**
* Wait for and iterate over the latest values of the source observable.
* If the source works faster than the iterator, values may be skipped, but
* not the onError or onCompleted events.
*/
public final class OperationLatest {
public final class BlockingOperatorLatest {
/** Utility class. */
private OperationLatest() {
private BlockingOperatorLatest() {
throw new IllegalStateException("No instances!");
}

Expand All @@ -48,7 +48,7 @@ public Iterator<T> iterator() {
}

/** Observer of source, iterator for output. */
static final class LatestObserverIterator<T> implements Observer<Notification<? extends T>>, Iterator<T> {
static final class LatestObserverIterator<T> extends Subscriber<Notification<? extends T>> implements Iterator<T> {
final Semaphore notify = new Semaphore(0);
// observer's notification
final AtomicReference<Notification<? extends T>> reference = new AtomicReference<Notification<? extends T>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/**
Expand All @@ -29,7 +29,7 @@
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.mostRecent.png">
*/
public final class OperationMostRecent {
public final class BlockingOperatorMostRecent {

public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {

Expand All @@ -39,6 +39,10 @@ public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);

return nextIterator;
Expand Down Expand Up @@ -74,7 +78,7 @@ public void remove() {
}
}

private static class MostRecentObserver<T> implements Observer<T> {
private static class MostRecentObserver<T> extends Subscriber<T> {
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicReference<T> value;
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/**
* Returns an Iterable that blocks until the Observable emits another item, then returns that item.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.next.png">
*/
public final class OperationNext {
public final class BlockingOperatorNext {

public static <T> Iterable<T> next(final Observable<? extends T> items) {
return new Iterable<T>() {
Expand Down Expand Up @@ -133,7 +133,7 @@ public void remove() {
}
}

private static class NextObserver<T> implements Observer<Notification<? extends T>> {
private static class NextObserver<T> extends Subscriber<Notification<? extends T>> {
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
private final AtomicBoolean waiting = new AtomicBoolean(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;

/**
Expand All @@ -35,7 +35,7 @@
* The toFuture operation throws an exception if the Observable emits more than one item. If the
* Observable may emit more than item, use <code>toList().toFuture()</code>.
*/
public class OperationToFuture {
public class BlockingOperatorToFuture {

/**
* Returns a Future that expects a single item from the observable.
Expand All @@ -52,7 +52,7 @@ public static <T> Future<T> toFuture(Observable<? extends T> that) {
final AtomicReference<T> value = new AtomicReference<T>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

final Subscription s = that.subscribe(new Observer<T>() {
final Subscription s = that.subscribe(new Subscriber<T>() {

@Override
public void onCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/**
Expand All @@ -33,7 +33,7 @@
*
* @see <a href="https://github.com/Netflix/RxJava/issues/50">Issue #50</a>
*/
public class OperationToIterator {
public class BlockingOperatorToIterator {

/**
* Returns an iterator that iterates all values of the observable.
Expand All @@ -45,7 +45,8 @@ public class OperationToIterator {
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
final BlockingQueue<Notification<? extends T>> notifications = new LinkedBlockingQueue<Notification<? extends T>>();

source.materialize().subscribe(new Observer<Notification<? extends T>>() {
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@Override
public void onCompleted() {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void onNext(T t) {
* It will then immediately swap itself out for the actual (after a single notification), but since this is now
* being done on the same producer thread no further buffering will occur.
*/
private static class PassThruObserver<T> implements Observer<T> {
private static class PassThruObserver<T> extends Subscriber<T> {

private final Observer<? super T> actual;
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
Expand Down Expand Up @@ -133,7 +133,7 @@ private void drainIfNeededAndSwitchToActual() {

}

private static class BufferedObserver<T> implements Observer<T> {
private static class BufferedObserver<T> extends Subscriber<T> {
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();

@Override
Expand Down
62 changes: 50 additions & 12 deletions rxjava-core/src/main/java/rx/operators/ChunkedOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subscriptions.CompositeSubscription;

/**
* The base class for operations that break observables into "chunks". Currently buffers and windows.
Expand Down Expand Up @@ -408,7 +410,7 @@ public void pushValue(T value) {
* The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record.
* <C> The type of object being tracked by the {@link Chunk}
*/
protected static class ChunkObserver<T, C> implements Observer<T> {
protected static class ChunkObserver<T, C> extends Subscriber<T> {

private final Chunks<T, C> chunks;
private final Observer<? super C> observer;
Expand Down Expand Up @@ -492,12 +494,24 @@ public ObservableBasedSingleChunkCreator(NonOverlappingChunks<T, C> chunks, Func

private void listenForChunkEnd() {
Observable<? extends TClosing> closingObservable = chunkClosingSelector.call();
closingObservable.subscribe(new Action1<TClosing>() {
closingObservable.unsafeSubscribe(new Subscriber<TClosing>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void call(TClosing closing) {
public void onNext(TClosing t) {
chunks.emitAndReplaceChunk();
listenForChunkEnd();
listenForChunkEnd();
}

});
}

Expand All @@ -524,23 +538,47 @@ public void stop() {
*/
protected static class ObservableBasedMultiChunkCreator<T, C, TOpening, TClosing> implements ChunkCreator {

private final SafeObservableSubscription subscription = new SafeObservableSubscription();
private final CompositeSubscription subscription = new CompositeSubscription();

public ObservableBasedMultiChunkCreator(final OverlappingChunks<T, C> chunks, Observable<? extends TOpening> openings, final Func1<? super TOpening, ? extends Observable<? extends TClosing>> chunkClosingSelector) {
subscription.wrap(openings.subscribe(new Action1<TOpening>() {
openings.unsafeSubscribe(new Subscriber<TOpening>(subscription) {

@Override
public void onCompleted() {

}

@Override
public void call(TOpening opening) {
public void onError(Throwable e) {

}

@Override
public void onNext(TOpening opening) {
final Chunk<T, C> chunk = chunks.createChunk();
Observable<? extends TClosing> closingObservable = chunkClosingSelector.call(opening);

closingObservable.subscribe(new Action1<TClosing>() {
closingObservable.unsafeSubscribe(new Subscriber<TClosing>() {

@Override
public void call(TClosing closing) {
chunks.emitChunk(chunk);
public void onCompleted() {

}
});

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(TClosing t) {
chunks.emitChunk(chunk);
}

});
}
}));

});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package rx.operators;

import java.io.ObjectStreamException;
import java.io.Serializable;

import rx.Notification;
Expand Down
Loading

0 comments on commit c09efd8

Please sign in to comment.