Skip to content

Commit

Permalink
Implemented publishLast
Browse files Browse the repository at this point in the history
  • Loading branch information
johnhmarks committed Sep 24, 2013
1 parent e0f57f1 commit d4b04d8
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
9 changes: 9 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subjects.AsyncSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
Expand Down Expand Up @@ -3634,6 +3635,14 @@ public ConnectableObservable<T> publish() {
return OperationMulticast.multicast(this, PublishSubject.<T> create());
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription that contains the last notification only.
* @return a {@link ConnectableObservable}
*/
public ConnectableObservable<T> publishLast() {
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
}

/**
* Synonymous with <code>reduce()</code>.
* <p>
Expand Down
42 changes: 42 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,48 @@ public void call(String v) {
}
}

@Test
public void testPublishLast() throws InterruptedException {
final AtomicInteger count = new AtomicInteger();
ConnectableObservable<String> connectable = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> observer) {
count.incrementAndGet();
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
observer.onNext("first");
observer.onNext("last");
observer.onCompleted();
}
}).start();
return subscription;
}
}).publishLast();

// subscribe once
final CountDownLatch latch = new CountDownLatch(1);
connectable.subscribe(new Action1<String>() {
@Override
public void call(String value) {
assertEquals("last", value);
latch.countDown();
}
});

// subscribe twice
connectable.subscribe(new Action1<String>() {
@Override
public void call(String _) {}
});

Subscription subscription = connectable.connect();
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertEquals(1, count.get());
subscription.unsubscribe();
}

@Test
public void testReplay() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
Expand Down

0 comments on commit d4b04d8

Please sign in to comment.