Skip to content

Commit

Permalink
Add an API-preserving hide() method to ConnectableFlux (reactor#1577)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle authored Apr 11, 2019
1 parent b8aaf9f commit 3a57c74
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public final Disposable connect() {
*/
public abstract void connect(Consumer<? super Disposable> cancelSupport);

@Override
public final ConnectableFlux<T> hide() {
return new ConnectableFluxHide<>(this);
}

/**
* Connects to the upstream source when the first {@link org.reactivestreams.Subscriber} subscribes and disconnects
* when all Subscribers cancelled or the upstream source completed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.function.Consumer;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;

/**
* Hide a {@link ConnectableFlux} from fusion optimizations while keeping the {@link ConnectableFlux}
* specific API visible.
*
* @author Simon Baslé
*/
final class ConnectableFluxHide<T> extends ConnectableFlux<T> implements Scannable {

final ConnectableFlux<T> source;

ConnectableFluxHide(ConnectableFlux<T> source) {
super();
this.source = source;
}

@Override
public int getPrefetch() {
return source.getPrefetch();
}

@Override
@Nullable
public Object scanUnsafe(Scannable.Attr key) {
if (key == Scannable.Attr.PARENT) return source;
if (key == Scannable.Attr.PREFETCH) return getPrefetch();

return null;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(actual);
}

@Override
public void connect(Consumer<? super Disposable> cancelSupport) {
source.connect(cancelSupport);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5276,7 +5276,7 @@ public final Mono<Boolean> hasElements() {
*
* @return a new {@link Flux} preventing {@link Publisher} / {@link Subscription} based Reactor optimizations
*/
public final Flux<T> hide() {
public Flux<T> hide() {
return new FluxHide<>(this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import org.junit.Test;

import reactor.core.Fuseable;
import reactor.core.Scannable;

import static org.assertj.core.api.Assertions.assertThat;

public class ConnectableFluxHideTest {

@Test
public void hideApiPreservesConnectableFlux() {
ConnectableFlux<Integer> connectableFlux = Flux.range(1, 4).replay();

assertThat(connectableFlux).as("original is Fuseable").isInstanceOf(Fuseable.class);

//this validates the hide() maintains the API
connectableFlux = connectableFlux.hide();
connectableFlux.connect();

assertThat(connectableFlux).as("hidden is not fuseable").isNotInstanceOf(Fuseable.class);
}

@Test
public void scanOperator() throws Exception {
ConnectableFlux<Integer> source = Flux.range(1, 4).publish();
ConnectableFluxHide<Integer> test = new ConnectableFluxHide<>(source);

assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
assertThat(test.scan(Scannable.Attr.PREFETCH))
.isEqualTo(256)
.isEqualTo(source.getPrefetch());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,13 @@ public void cacheFlux() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay()
.hide()
.autoConnect()
.hide()
.elapsed();

StepVerifier.create(source)
.expectNoFusionSupport()
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
Expand Down Expand Up @@ -143,14 +146,16 @@ public void cacheFluxFused() {

@Test
public void cacheFluxTTL() {

Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(Duration.ofMillis(2000))
.hide()
.autoConnect()
.hide()
.elapsed();

StepVerifier.create(source)
.expectNoFusionSupport()
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
Expand Down Expand Up @@ -193,14 +198,16 @@ public void cacheFluxTTLFused() {

@Test
public void cacheFluxTTLMillis() {

Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(Duration.ofMillis(2000), vts)
.hide()
.autoConnect()
.hide()
.elapsed();

StepVerifier.create(source)
.expectNoFusionSupport()
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
Expand All @@ -221,10 +228,13 @@ public void cacheFluxHistoryTTL() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(2, Duration.ofMillis(2000))
.hide()
.autoConnect()
.hide()
.elapsed();

StepVerifier.create(source)
.expectNoFusionSupport()
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
Expand All @@ -241,7 +251,6 @@ public void cacheFluxHistoryTTL() {

@Test
public void cacheFluxHistoryTTLFused() {

Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(2, Duration.ofMillis(2000))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void liftParallelFlux() {
@Test
public void liftConnectableFlux() {
ConnectableFlux<Integer> source = Flux.just(1)
.publish(); //TODO hide if ConnectableFlux gets a hide function
.publish()
.hide();

Operators.LiftFunction<Integer, Integer> liftFunction =
Operators.LiftFunction.liftScannable(null, (s, actual) -> actual);
Expand Down

0 comments on commit 3a57c74

Please sign in to comment.