Skip to content

Commit

Permalink
2.x: Fix size+time bound window not creating windows properly (Reacti…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Sep 30, 2019
1 parent 71bfcae commit 1cf6e3e
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ void drainLoop() {

if (isHolder) {
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
if (!restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
w.onComplete();
count = 0;
w = UnicastProcessor.<T>create(bufferSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ void drainLoop() {

if (isHolder) {
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
if (!restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
w.onComplete();
count = 0;
w = UnicastSubject.create(bufferSize);
Expand Down
45 changes: 44 additions & 1 deletion src/test/java/io/reactivex/flowable/FlowableWindowTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import io.reactivex.Flowable;
import io.reactivex.*;
import io.reactivex.functions.*;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subscribers.TestSubscriber;

public class FlowableWindowTests {

Expand Down Expand Up @@ -50,4 +54,43 @@ public void accept(List<Integer> xs) {
assertEquals(2, lists.size());

}

@Test
public void timeSizeWindowAlternatingBounds() {
TestScheduler scheduler = new TestScheduler();
PublishProcessor<Integer> pp = PublishProcessor.create();

TestSubscriber<List<Integer>> ts = pp.window(5, TimeUnit.SECONDS, scheduler, 2)
.flatMapSingle(new Function<Flowable<Integer>, SingleSource<List<Integer>>>() {
@Override
public SingleSource<List<Integer>> apply(Flowable<Integer> v) {
return v.toList();
}
})
.test();

pp.onNext(1);
pp.onNext(2);
ts.assertValueCount(1); // size bound hit

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
pp.onNext(3);
scheduler.advanceTimeBy(6, TimeUnit.SECONDS);
ts.assertValueCount(2); // time bound hit

pp.onNext(4);
pp.onNext(5);

ts.assertValueCount(3); // size bound hit again

pp.onNext(4);

scheduler.advanceTimeBy(6, TimeUnit.SECONDS);

ts.assertValueCount(4)
.assertNoErrors()
.assertNotComplete();

ts.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,19 @@ public void subscribe(Subscriber<? super String> subscriber) {
Flowable<Flowable<String>> windowed = source.window(100, TimeUnit.MILLISECONDS, scheduler, 2);
windowed.subscribe(observeWindow(list, lists));

scheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS);
scheduler.advanceTimeTo(95, TimeUnit.MILLISECONDS);
assertEquals(1, lists.size());
assertEquals(lists.get(0), list("one", "two"));

scheduler.advanceTimeTo(200, TimeUnit.MILLISECONDS);
assertEquals(2, lists.size());
assertEquals(lists.get(1), list("three", "four"));
scheduler.advanceTimeTo(195, TimeUnit.MILLISECONDS);
assertEquals(3, lists.size());
assertTrue(lists.get(1).isEmpty());
assertEquals(lists.get(2), list("three", "four"));

scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
assertEquals(3, lists.size());
assertEquals(lists.get(2), list("five"));
assertEquals(5, lists.size());
assertTrue(lists.get(3).isEmpty());
assertEquals(lists.get(4), list("five"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,19 @@ public void subscribe(Observer<? super String> observer) {
Observable<Observable<String>> windowed = source.window(100, TimeUnit.MILLISECONDS, scheduler, 2);
windowed.subscribe(observeWindow(list, lists));

scheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS);
scheduler.advanceTimeTo(95, TimeUnit.MILLISECONDS);
assertEquals(1, lists.size());
assertEquals(lists.get(0), list("one", "two"));

scheduler.advanceTimeTo(200, TimeUnit.MILLISECONDS);
assertEquals(2, lists.size());
assertEquals(lists.get(1), list("three", "four"));
scheduler.advanceTimeTo(195, TimeUnit.MILLISECONDS);
assertEquals(3, lists.size());
assertTrue(lists.get(1).isEmpty());
assertEquals(lists.get(2), list("three", "four"));

scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
assertEquals(3, lists.size());
assertEquals(lists.get(2), list("five"));
assertEquals(5, lists.size());
assertTrue(lists.get(3).isEmpty());
assertEquals(lists.get(4), list("five"));
}

@Test
Expand Down
44 changes: 44 additions & 0 deletions src/test/java/io/reactivex/observable/ObservableWindowTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import io.reactivex.Observable;
import io.reactivex.SingleSource;
import io.reactivex.functions.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subjects.PublishSubject;

public class ObservableWindowTests {

Expand Down Expand Up @@ -50,4 +55,43 @@ public void accept(List<Integer> xs) {
assertEquals(2, lists.size());

}

@Test
public void timeSizeWindowAlternatingBounds() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Integer> ps = PublishSubject.create();

TestObserver<List<Integer>> to = ps.window(5, TimeUnit.SECONDS, scheduler, 2)
.flatMapSingle(new Function<Observable<Integer>, SingleSource<List<Integer>>>() {
@Override
public SingleSource<List<Integer>> apply(Observable<Integer> v) {
return v.toList();
}
})
.test();

ps.onNext(1);
ps.onNext(2);
to.assertValueCount(1); // size bound hit

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
ps.onNext(3);
scheduler.advanceTimeBy(6, TimeUnit.SECONDS);
to.assertValueCount(2); // time bound hit

ps.onNext(4);
ps.onNext(5);

to.assertValueCount(3); // size bound hit again

ps.onNext(4);

scheduler.advanceTimeBy(6, TimeUnit.SECONDS);

to.assertValueCount(4)
.assertNoErrors()
.assertNotComplete();

to.dispose();
}
}

0 comments on commit 1cf6e3e

Please sign in to comment.