Skip to content

Commit

Permalink
fix reactor#1647 Null out lastKey to avoid retaining in distinctUntil…
Browse files Browse the repository at this point in the history
…Changed

This includes cancel, complete and error.
  • Loading branch information
simonbasle authored Apr 5, 2019
1 parent 46ea79b commit 3603a2d
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static final class DistinctUntilChangedSubscriber<T, K>

boolean done;

@Nullable
K lastKey;

DistinctUntilChangedSubscriber(CoreSubscriber<? super T> actual,
Expand Down Expand Up @@ -149,6 +150,7 @@ public void onError(Throwable t) {
return;
}
done = true;
lastKey = null;

actual.onError(t);
}
Expand All @@ -159,6 +161,7 @@ public void onComplete() {
return;
}
done = true;
lastKey = null;

actual.onComplete();
}
Expand All @@ -185,6 +188,7 @@ public void request(long n) {
@Override
public void cancel() {
s.cancel();
lastKey = null;
}
}

Expand All @@ -200,6 +204,7 @@ static final class DistinctUntilChangedConditionalSubscriber<T, K>

boolean done;

@Nullable
K lastKey;

DistinctUntilChangedConditionalSubscriber(ConditionalSubscriber<? super T> actual,
Expand Down Expand Up @@ -278,6 +283,7 @@ public void onError(Throwable t) {
return;
}
done = true;
lastKey = null;

actual.onError(t);
}
Expand All @@ -288,6 +294,7 @@ public void onComplete() {
return;
}
done = true;
lastKey = null;

actual.onComplete();
}
Expand All @@ -314,6 +321,7 @@ public void request(long n) {
@Override
public void cancel() {
s.cancel();
lastKey = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

import java.util.AbstractCollection;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Function;

import org.junit.Test;
Expand All @@ -33,6 +36,8 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.FluxDistinct.DistinctConditionalSubscriber;
import reactor.core.publisher.FluxDistinct.DistinctSubscriber;
import reactor.test.MemoryUtils;
import reactor.test.MemoryUtils.RetainedDetector;
import reactor.test.MockUtils;
Expand All @@ -43,6 +48,7 @@
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -422,8 +428,8 @@ public void distinctPredicateThrowsConditional() {
when(actualConditional.currentContext()).thenReturn(Context.empty());
when(actualConditional.tryOnNext(anyInt())).thenReturn(false);

FluxDistinct.DistinctConditionalSubscriber<Integer, Integer, Set<Integer>> conditionalSubscriber =
new FluxDistinct.DistinctConditionalSubscriber<>(
DistinctConditionalSubscriber<Integer, Integer, Set<Integer>> conditionalSubscriber =
new DistinctConditionalSubscriber<>(
actualConditional,
new HashSet<>(),
k -> k,
Expand All @@ -443,8 +449,8 @@ public void distinctPredicateThrowsConditionalOnNext() {
Fuseable.ConditionalSubscriber<Integer> actualConditional = Mockito.mock(Fuseable.ConditionalSubscriber.class);
when(actualConditional.currentContext()).thenReturn(Context.empty());

FluxDistinct.DistinctConditionalSubscriber<Integer, Integer, Set<Integer>> conditionalSubscriber =
new FluxDistinct.DistinctConditionalSubscriber<>(
DistinctConditionalSubscriber<Integer, Integer, Set<Integer>> conditionalSubscriber =
new DistinctConditionalSubscriber<>(
actualConditional,
new HashSet<>(),
k -> k,
Expand Down Expand Up @@ -535,8 +541,8 @@ public boolean add(T t) {
@Test
public void scanSubscriber() {
CoreSubscriber<String> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
FluxDistinct.DistinctSubscriber<String, Integer, Set<Integer>> test =
new FluxDistinct.DistinctSubscriber<>(actual, new HashSet<>(), String::hashCode, Set::add, Set::clear);
DistinctSubscriber<String, Integer, Set<Integer>> test =
new DistinctSubscriber<>(actual, new HashSet<>(), String::hashCode, Set::add, Set::clear);
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);

Expand All @@ -552,8 +558,8 @@ public void scanSubscriber() {
public void scanConditionalSubscriber() {
@SuppressWarnings("unchecked")
Fuseable.ConditionalSubscriber<String> actual = Mockito.mock(MockUtils.TestScannableConditionalSubscriber.class);
FluxDistinct.DistinctConditionalSubscriber<String, Integer, Set<Integer>> test =
new FluxDistinct.DistinctConditionalSubscriber<>(actual, new HashSet<>(), String::hashCode, Set::add, Set::clear);
DistinctConditionalSubscriber<String, Integer, Set<Integer>> test =
new DistinctConditionalSubscriber<>(actual, new HashSet<>(), String::hashCode, Set::add, Set::clear);
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);

Expand Down Expand Up @@ -664,6 +670,168 @@ public void distinctDefaultCancelDoesntRetainObjects() throws InterruptedExcepti
.isEqualTo(50);
}

@Test
public void doesntRetainObjectsWithForcedCompleteOnSubscriber() {
RetainedDetector retainedDetector = new RetainedDetector();

DistinctSubscriber<DistinctDefaultCancel, DistinctDefaultCancel, Set<DistinctDefaultCancel>> sub = new DistinctSubscriber<>(
new BaseSubscriber<DistinctDefaultCancel>() {},
new HashSet<>(),
Function.identity(),
Collection::add,
Collection::clear);
sub.onSubscribe(Operators.emptySubscription());
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(0)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(1)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(2)));

assertThat(retainedDetector.finalizedCount()).isZero();
assertThat(retainedDetector.trackedTotal()).isEqualTo(3);

sub.onComplete();
System.gc();

await()
.atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(retainedDetector.finalizedCount()).isEqualTo(3));
}

@Test
public void doesntRetainObjectsWithForcedCompleteOnSubscriber_conditional() {
RetainedDetector retainedDetector = new RetainedDetector();

DistinctConditionalSubscriber<DistinctDefaultCancel, DistinctDefaultCancel, Set<DistinctDefaultCancel>> sub = new DistinctConditionalSubscriber<>(
Operators.toConditionalSubscriber(new BaseSubscriber<DistinctDefaultCancel>() {}),
new HashSet<>(),
Function.identity(),
Collection::add,
Collection::clear);
sub.onSubscribe(Operators.emptySubscription());
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(0)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(1)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(2)));

assertThat(retainedDetector.finalizedCount()).isZero();
assertThat(retainedDetector.trackedTotal()).isEqualTo(3);

sub.onComplete();
System.gc();

await()
.atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(retainedDetector.finalizedCount()).isEqualTo(3));
}

@Test
public void doesntRetainObjectsWithForcedErrorOnSubscriber() {
RetainedDetector retainedDetector = new RetainedDetector();

DistinctSubscriber<DistinctDefaultCancel, DistinctDefaultCancel, Set<DistinctDefaultCancel>> sub = new DistinctSubscriber<>(
new BaseSubscriber<DistinctDefaultCancel>() {
@Override
protected void hookOnError(Throwable throwable) { }
},
new HashSet<>(),
Function.identity(),
Collection::add,
Collection::clear);
sub.onSubscribe(Operators.emptySubscription());
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(0)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(1)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(2)));

assertThat(retainedDetector.finalizedCount()).isZero();
assertThat(retainedDetector.trackedTotal()).isEqualTo(3);

sub.onError(new IllegalStateException("expected"));
System.gc();

await()
.atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(retainedDetector.finalizedCount()).isEqualTo(3));
}

@Test
public void doesntRetainObjectsWithForcedErrorOnSubscriber_conditional() {
RetainedDetector retainedDetector = new RetainedDetector();

DistinctConditionalSubscriber<DistinctDefaultCancel, DistinctDefaultCancel, Set<DistinctDefaultCancel>> sub = new DistinctConditionalSubscriber<>(
Operators.toConditionalSubscriber(new BaseSubscriber<DistinctDefaultCancel>() {
@Override
protected void hookOnError(Throwable throwable) { }
}),
new HashSet<>(),
Function.identity(),
Collection::add,
Collection::clear);
sub.onSubscribe(Operators.emptySubscription());
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(0)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(1)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(2)));

assertThat(retainedDetector.finalizedCount()).isZero();
assertThat(retainedDetector.trackedTotal()).isEqualTo(3);

sub.onError(new IllegalStateException("expected"));
System.gc();

await()
.atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(retainedDetector.finalizedCount()).isEqualTo(3));
}

@Test
public void doesntRetainObjectsWithForcedCancelOnSubscriber() {
RetainedDetector retainedDetector = new RetainedDetector();

DistinctSubscriber<DistinctDefaultCancel, DistinctDefaultCancel, Set<DistinctDefaultCancel>> sub = new DistinctSubscriber<>(
new BaseSubscriber<DistinctDefaultCancel>() {},
new HashSet<>(),
Function.identity(),
Collection::add,
Collection::clear);
sub.onSubscribe(Operators.emptySubscription());
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(0)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(1)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(2)));

assertThat(retainedDetector.finalizedCount()).isZero();
assertThat(retainedDetector.trackedTotal()).isEqualTo(3);

sub.cancel();
System.gc();

await()
.atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(retainedDetector.finalizedCount()).isEqualTo(3));
}

@Test
public void doesntRetainObjectsWithForcedCancelOnSubscriber_conditional() {
RetainedDetector retainedDetector = new RetainedDetector();

DistinctConditionalSubscriber<DistinctDefaultCancel, DistinctDefaultCancel, Set<DistinctDefaultCancel>> sub = new DistinctConditionalSubscriber<>(
Operators.toConditionalSubscriber(new BaseSubscriber<DistinctDefaultCancel>() {}),
new HashSet<>(),
Function.identity(),
Collection::add,
Collection::clear);
sub.onSubscribe(Operators.emptySubscription());
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(0)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(1)));
sub.onNext(retainedDetector.tracked(new DistinctDefaultCancel(2)));

assertThat(retainedDetector.finalizedCount()).isZero();
assertThat(retainedDetector.trackedTotal()).isEqualTo(3);

sub.cancel();
System.gc();

await()
.atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(retainedDetector.finalizedCount()).isEqualTo(3));
}

static class DistinctDefault {

private final int i;
Expand Down
Loading

0 comments on commit 3603a2d

Please sign in to comment.