Skip to content

Commit

Permalink
Synchronized Operator Check for isTerminated
Browse files Browse the repository at this point in the history
As per ReactiveX#872 make Synchronized reject events after terminal state.
This class should not unsubscribe though. That is only for SafeSubscriber at the end.
  • Loading branch information
benjchristensen committed Feb 18, 2014
1 parent 84372e1 commit f74bb41
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public final class SynchronizedObserver<T> implements Observer<T> {
*/

private final Observer<? super T> observer;
private volatile Object lock;
private final Object lock;
private boolean isTerminated = false;

public SynchronizedObserver(Observer<? super T> subscriber) {
this.observer = subscriber;
Expand All @@ -55,19 +56,27 @@ public SynchronizedObserver(Observer<? super T> subscriber, Object lock) {

public void onNext(T arg) {
synchronized (lock) {
observer.onNext(arg);
if (!isTerminated) {
observer.onNext(arg);
}
}
}

public void onError(Throwable e) {
synchronized (lock) {
observer.onError(e);
if (!isTerminated) {
isTerminated = true;
observer.onError(e);
}
}
}

public void onCompleted() {
synchronized (lock) {
observer.onCompleted();
if (!isTerminated) {
isTerminated = true;
observer.onCompleted();
}
}
}
}

0 comments on commit f74bb41

Please sign in to comment.