Skip to content

Commit

Permalink
GroupBy Fixes
Browse files Browse the repository at this point in the history
- There were bugs in the implementation, this fixes some of them. In particular, it was unsubscribing from the parent when all children were completed, and that would unsubscribe even if new groups were going to come.
- There are still problems related to `subscribeOn` and the "time gap" that are being played with. Unit tests related to that are still failing.
  • Loading branch information
benjchristensen committed Feb 11, 2014
1 parent eba6b93 commit f1b46a9
Show file tree
Hide file tree
Showing 2 changed files with 405 additions and 62 deletions.
24 changes: 16 additions & 8 deletions rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
Expand All @@ -47,18 +49,20 @@ public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K,
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
return new Subscriber<T>(new CompositeSubscription()) {
private final Map<K, PublishSubject<T>> groups = new HashMap<K, PublishSubject<T>>();
private final Map<K, Subject<T, T>> groups = new HashMap<K, Subject<T, T>>();
private final AtomicInteger completionCounter = new AtomicInteger(0);
private final AtomicBoolean completed = new AtomicBoolean(false);

@Override
public void onCompleted() {
completed.set(true);
// if we receive onCompleted from our parent we onComplete children
for (PublishSubject<T> ps : groups.values()) {
for (Subject<T, T> ps : groups.values()) {
ps.onCompleted();
}

// special case for empty (no groups emitted)
if (completionCounter.get() == 0) {
// special case if no children are running (such as an empty sequence, or just getting the groups and not subscribing)
childObserver.onCompleted();
}
}
Expand All @@ -73,15 +77,15 @@ public void onError(Throwable e) {
public void onNext(T t) {
try {
final K key = keySelector.call(t);
PublishSubject<T> gps = groups.get(key);
Subject<T, T> gps = groups.get(key);
if (gps == null) {
// this group doesn't exist
if (childObserver.isUnsubscribed()) {
// we have been unsubscribed on the outer so won't send any more groups
return;
}
gps = PublishSubject.create();
final PublishSubject<T> _gps = gps;
final Subject<T, T> _gps = gps;

GroupedObservable<K, T> go = new GroupedObservable<K, T>(key, new OnSubscribe<T>() {

Expand Down Expand Up @@ -130,9 +134,13 @@ public void onNext(T t) {
}

private void completeInner() {
if (completionCounter.decrementAndGet() == 0) {
unsubscribe();
for (PublishSubject<T> ps : groups.values()) {
if (completionCounter.decrementAndGet() == 0 && (completed.get() || childObserver.isUnsubscribed())) {
System.out.println("groupBy INNER completed");
if (childObserver.isUnsubscribed()) {
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
unsubscribe();
}
for (Subject<T, T> ps : groups.values()) {
ps.onCompleted();
}
childObserver.onCompleted();
Expand Down
Loading

0 comments on commit f1b46a9

Please sign in to comment.