Skip to content

Commit

Permalink
fix reactor#1529 Prevent StepVerifier collect hang when consuming all…
Browse files Browse the repository at this point in the history
… signals

Fix has also been tested against test [RecordTest](https://github.com/george-hawkins/publish-cancel-test/blob/master/src/test/java/com/example/RecordTest.java) provided by the issue author.

Everything worked fine if publisher ended with `onComplete` signal.
Without `onComplete` ,  `completeLatch` was never decremented and as a result 
verify step blocked forever.
  • Loading branch information
pmackowski authored and simonbasle committed Mar 27, 2019
1 parent e8c8a1c commit 429bf7e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1307,7 +1307,7 @@ boolean onCollect(Signal<T> actualSignal) {
this.completeLatch.countDown();
return true;
}
return true;
return false;
}

@SuppressWarnings("unchecked")
Expand Down
46 changes: 45 additions & 1 deletion reactor-test/src/test/java/reactor/test/StepVerifierTests.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1220,6 +1220,50 @@ public void testThenConsumeWhileFails() {
.withMessageContaining("expectNext(9)");
}

@Test
public void testExpectRecordedMatches() {
List<Integer> expected = Arrays.asList(1,2);

StepVerifier.create(Flux.just(1,2))
.recordWith(ArrayList::new)
.thenConsumeWhile(i -> i < 2)
.expectRecordedMatches(expected::equals)
.thenCancel()
.verify();
}

@Test
public void testExpectRecordedMatchesTwice() {
List<Integer> expected1 = Arrays.asList(1,2);
List<Integer> expected2 = Arrays.asList(3,4);

StepVerifier.create(Flux.just(1,2,3,4))
.recordWith(ArrayList::new)
.thenConsumeWhile(i -> i < 2)
.expectRecordedMatches(expected1::equals)
.recordWith(ArrayList::new)
.thenConsumeWhile(i -> i < 4)
.expectRecordedMatches(expected2::equals)
.thenCancel()
.verify();
}

@Test
public void testExpectRecordedMatchesWithoutComplete() {
List<Integer> expected = Arrays.asList(1,2);

TestPublisher<Integer> publisher = TestPublisher.createCold();
publisher.next(1);
publisher.next(2);

StepVerifier.create(publisher)
.recordWith(ArrayList::new)
.thenConsumeWhile(i -> i < 2)
.expectRecordedMatches(expected::equals)
.thenCancel()
.verify();
}

@Test
public void testWithDescription() {
assertThatExceptionOfType(AssertionError.class)
Expand Down

0 comments on commit 429bf7e

Please sign in to comment.