Skip to content

Commit

Permalink
Subjects Fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Jan 22, 2014
1 parent b49943a commit 015e2b4
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 47 deletions.
17 changes: 12 additions & 5 deletions rxjava-core/src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Operator;
import rx.Observable.OnSubscribe;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.util.functions.Action1;

Expand Down Expand Up @@ -53,13 +54,13 @@
*
* @param <T>
*/
public final class AsyncSubject<T> extends Subject<T, T> {
public final class AsyncSubject<T> extends Subject<T> {

public static <T> AsyncSubject<T> create() {
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(new Notification<T>());

Action1<Operator<? super T>> onSubscribe = subscriptionManager.getOnSubscribeFunc(
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
* This function executes at beginning of subscription.
*
Expand Down Expand Up @@ -95,15 +96,21 @@ protected static <T> void emitValueToObserver(Notification<T> n, Observer<? supe
}
}

private final OnSubscribe<T> onSubscribe;
private final SubjectSubscriptionManager<T> subscriptionManager;
final AtomicReference<Notification<T>> lastNotification;

protected AsyncSubject(Action1<Operator<? super T>> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
super(onSubscribe);
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
this.onSubscribe = onSubscribe;
this.subscriptionManager = subscriptionManager;
this.lastNotification = lastNotification;
}

@Override
public Observable<T> toObservable() {
return Observable.create(onSubscribe);
}

@Override
public void onCompleted() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
Expand Down
29 changes: 12 additions & 17 deletions rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Operator;
import rx.Observable.OnSubscribe;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.util.functions.Action1;
import rx.util.functions.Action2;
Expand Down Expand Up @@ -66,19 +67,7 @@
*
* @param <T>
*/
public final class BehaviorSubject<T> extends Subject<T, T> {

/**
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it.
*
* @param defaultValue
* The value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events.
* @return the constructed {@link BehaviorSubject}.
* @deprecated Use {@link create()} instead.
*/
public static <T> BehaviorSubject<T> createWithDefaultValue(T defaultValue) {
return create(defaultValue);
}
public final class BehaviorSubject<T> extends Subject<T> {

/**
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it.
Expand All @@ -92,7 +81,7 @@ public static <T> BehaviorSubject<T> create(T defaultValue) {
// set a default value so subscriptions will immediately receive this until a new notification is received
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(new Notification<T>(defaultValue));

Action1<Operator<? super T>> onSubscribe = subscriptionManager.getOnSubscribeFunc(
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
* This function executes at beginning of subscription.
*
Expand Down Expand Up @@ -131,15 +120,21 @@ public void call(SubjectObserver<? super T> o) {
return new BehaviorSubject<T>(onSubscribe, subscriptionManager, lastNotification);
}

private final OnSubscribe<T> onSubscribe;
private final SubjectSubscriptionManager<T> subscriptionManager;
final AtomicReference<Notification<T>> lastNotification;

protected BehaviorSubject(Action1<Operator<? super T>> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
super(onSubscribe);
protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
this.onSubscribe = onSubscribe;
this.subscriptionManager = subscriptionManager;
this.lastNotification = lastNotification;
}

@Override
public Observable<T> toObservable() {
return Observable.create(onSubscribe);
}

@Override
public void onCompleted() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
Expand Down
17 changes: 12 additions & 5 deletions rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Operator;
import rx.Observable.OnSubscribe;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.util.functions.Action1;
import rx.util.functions.Action2;
Expand Down Expand Up @@ -48,14 +49,14 @@
*
* @param <T>
*/
public final class PublishSubject<T> extends Subject<T, T> {
public final class PublishSubject<T> extends Subject<T> {

public static <T> PublishSubject<T> create() {
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
// set a default value so subscriptions will immediately receive this until a new notification is received
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>();

Action1<Operator<? super T>> onSubscribe = subscriptionManager.getOnSubscribeFunc(
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
* This function executes at beginning of subscription.
*
Expand Down Expand Up @@ -86,15 +87,21 @@ public void call(SubjectObserver<? super T> o) {
return new PublishSubject<T>(onSubscribe, subscriptionManager, lastNotification);
}

private final OnSubscribe<T> onSubscribe;
private final SubjectSubscriptionManager<T> subscriptionManager;
final AtomicReference<Notification<T>> lastNotification;

protected PublishSubject(Action1<Operator<? super T>> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
super(onSubscribe);
protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
this.onSubscribe = onSubscribe;
this.subscriptionManager = subscriptionManager;
this.lastNotification = lastNotification;
}

@Override
public Observable<T> toObservable() {
return Observable.create(onSubscribe);
}

@Override
public void onCompleted() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
Expand Down
18 changes: 12 additions & 6 deletions rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Operator;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.util.functions.Action1;
import rx.util.functions.Action2;

/**
* Subject that retains all events and will replay them to an {@link Observer} that subscribes.
Expand All @@ -51,7 +51,7 @@
*
* @param <T>
*/
public final class ReplaySubject<T> extends Subject<T, T> {
public final class ReplaySubject<T> extends Subject<T> {
public static <T> ReplaySubject<T> create() {
return create(16);
}
Expand All @@ -60,7 +60,7 @@ public static <T> ReplaySubject<T> create(int initialCapacity) {
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
final ReplayState<T> state = new ReplayState<T>(initialCapacity);

Action1<Operator<? super T>> onSubscribe = subscriptionManager.getOnSubscribeFunc(
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
* This function executes at beginning of subscription.
* We want to replay history with the subscribing thread
Expand Down Expand Up @@ -108,13 +108,19 @@ public ReplayState(int initialCapacity) {

private final SubjectSubscriptionManager<T> subscriptionManager;
private final ReplayState<T> state;
private final OnSubscribe<T> onSubscribe;

protected ReplaySubject(Action1<Operator<? super T>> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, ReplayState<T> state) {
super(onSubscribe);
protected ReplaySubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, ReplayState<T> state) {
this.onSubscribe = onSubscribe;
this.subscriptionManager = subscriptionManager;
this.state = state;
}

@Override
public Observable<T> toObservable() {
return Observable.create(onSubscribe);
}

@Override
public void onCompleted() {
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
Expand Down
10 changes: 4 additions & 6 deletions rxjava-core/src/main/java/rx/subjects/Subject.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

import rx.Observable;
import rx.Observer;
import rx.Operator;
import rx.util.functions.Action1;

public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
protected Subject(Action1<Operator<? super R>> onSubscribe) {
super(onSubscribe);
}
public abstract class Subject<T> extends Observer<T> {

public abstract Observable<T> toObservable();

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Observer;
import rx.Operator;
import rx.Subscription;
import rx.Observable.OnSubscribe;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Action1;
import rx.util.functions.Action2;

/* package */class SubjectSubscriptionManager<T> {

Expand All @@ -39,11 +38,11 @@
* Only runs if Subject is in terminal state and the Observer ends up not being registered.
* @return
*/
public Action1<Operator<? super T>> getOnSubscribeFunc(final Action1<SubjectObserver<? super T>> onSubscribe, final Action1<SubjectObserver<? super T>> onTerminated) {
return new Action1<Operator<? super T>>() {
public OnSubscribe<T> getOnSubscribeFunc(final Action1<SubjectObserver<? super T>> onSubscribe, final Action1<SubjectObserver<? super T>> onTerminated) {
return new OnSubscribe<T>() {
@Override
public void call(Operator<? super T> actualOperator) {
SubjectObserver<T> observer = new SubjectObserver<T>(actualOperator);
public void call(Observer<? super T> actualObserver) {
SubjectObserver<T> observer = new SubjectObserver<T>(actualObserver);
// invoke onSubscribe logic
if (onSubscribe != null) {
onSubscribe.call(observer);
Expand All @@ -69,7 +68,7 @@ public void call(Operator<? super T> actualOperator) {
break;
} else {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
actualOperator.add(subscription); // add to parent if the Subject itself is unsubscribed
actualObserver.add(subscription); // add to parent if the Subject itself is unsubscribed
addedObserver = true;
subscription.wrap(new Subscription() {
@Override
Expand Down Expand Up @@ -226,7 +225,7 @@ public State<T> removeObserver(Subscription s) {
}
}

protected static class SubjectObserver<T> implements Observer<T> {
protected static class SubjectObserver<T> extends Observer<T> {

private final Observer<? super T> actual;
protected volatile boolean caughtUp = false;
Expand Down

0 comments on commit 015e2b4

Please sign in to comment.