Skip to content

Commit

Permalink
Drop '2' from BeamFnDataGrpcMultiplexer2 and BeamFnDataInboundObserve…
Browse files Browse the repository at this point in the history
…r2 (apache#25144)

This is a follow-up for a comment in apache#25104

Also remove InboundDataClient which was missed in apache#25104
  • Loading branch information
lukecwik authored Jan 24, 2023
1 parent b7c07e2 commit 299a7b0
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.DataEndpoint;
Expand Down Expand Up @@ -268,7 +268,7 @@ public ActiveBundle newBundle(

CompletionStage<BeamFnApi.ProcessBundleResponse> specificResponse =
genericResponse.thenApply(InstructionResponse::getProcessBundle);
Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver;
Optional<BeamFnDataInboundObserver> beamFnDataInboundObserver;
if (outputReceivers.isEmpty() && timerReceivers.isEmpty()) {
beamFnDataInboundObserver = Optional.empty();
} else {
Expand All @@ -291,7 +291,7 @@ public ActiveBundle newBundle(
timerReceiver.getValue().getReceiver()));
}
beamFnDataInboundObserver =
Optional.of(BeamFnDataInboundObserver2.forConsumers(dataEndpoints, timerEndpoints));
Optional.of(BeamFnDataInboundObserver.forConsumers(dataEndpoints, timerEndpoints));
fnApiDataService.registerReceiver(bundleId, beamFnDataInboundObserver.get());
}

Expand Down Expand Up @@ -339,7 +339,7 @@ public class ActiveBundle implements RemoteBundle {
private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
private final BeamFnDataOutboundAggregator beamFnDataOutboundAggregator;
private final Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers;
private final Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver;
private final Optional<BeamFnDataInboundObserver> beamFnDataInboundObserver;
private final StateDelegator.Registration stateRegistration;
private final BundleProgressHandler progressHandler;
private final BundleSplitHandler splitHandler;
Expand All @@ -354,7 +354,7 @@ private ActiveBundle(
CompletionStage<ProcessBundleResponse> response,
BeamFnDataOutboundAggregator beamFnDataOutboundAggregator,
Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers,
Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver,
Optional<BeamFnDataInboundObserver> beamFnDataInboundObserver,
StateDelegator.Registration stateRegistration,
BundleProgressHandler progressHandler,
BundleSplitHandler splitHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.server.FnService;
Expand Down Expand Up @@ -60,15 +60,15 @@ public static GrpcDataService create(
return new GrpcDataService(options, executor, outboundObserverFactory);
}

private final SettableFuture<BeamFnDataGrpcMultiplexer2> connectedClient;
private final SettableFuture<BeamFnDataGrpcMultiplexer> connectedClient;
/**
* A collection of multiplexers which are not used to send data. A handle to these multiplexers is
* maintained in order to perform an orderly shutdown.
*
* <p>TODO: (BEAM-3811) Replace with some cancellable collection, to ensure that new clients of a
* closed {@link GrpcDataService} are closed with that {@link GrpcDataService}.
*/
private final Queue<BeamFnDataGrpcMultiplexer2> additionalMultiplexers;
private final Queue<BeamFnDataGrpcMultiplexer> additionalMultiplexers;

private final PipelineOptions options;
private final ExecutorService executor;
Expand Down Expand Up @@ -99,8 +99,8 @@ public GrpcDataService() {
public StreamObserver<BeamFnApi.Elements> data(
final StreamObserver<BeamFnApi.Elements> outboundElementObserver) {
LOG.info("Beam Fn Data client connected.");
BeamFnDataGrpcMultiplexer2 multiplexer =
new BeamFnDataGrpcMultiplexer2(
BeamFnDataGrpcMultiplexer multiplexer =
new BeamFnDataGrpcMultiplexer(
null, outboundObserverFactory, inbound -> outboundElementObserver);
// First client that connects completes this future.
if (!connectedClient.set(multiplexer)) {
Expand All @@ -121,7 +121,7 @@ public void close() throws Exception {
// Multiplexer, but if there isn't any multiplexer it prevents callers blocking forever.
connectedClient.cancel(true);
// Close any other open connections
for (BeamFnDataGrpcMultiplexer2 additional : additionalMultiplexers) {
for (BeamFnDataGrpcMultiplexer additional : additionalMultiplexers) {
try {
additional.close();
} catch (Exception ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
Expand Down Expand Up @@ -150,18 +150,18 @@ public void testMultipleClientsSendMessagesAreDirectedToProperConsumers() throws
}

List<Collection<WindowedValue<String>>> serverInboundValues = new ArrayList<>();
Collection<BeamFnDataInboundObserver2> inboundObservers = new ArrayList<>();
Collection<BeamFnDataInboundObserver> inboundObservers = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
final Collection<WindowedValue<String>> serverInboundValue = new ArrayList<>();
serverInboundValues.add(serverInboundValue);
BeamFnDataInboundObserver2 inboundObserver =
BeamFnDataInboundObserver2.forConsumers(
BeamFnDataInboundObserver inboundObserver =
BeamFnDataInboundObserver.forConsumers(
Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, serverInboundValue::add)),
Collections.emptyList());
service.registerReceiver(Integer.toString(i), inboundObserver);
inboundObservers.add(inboundObserver);
}
for (BeamFnDataInboundObserver2 inboundObserver : inboundObservers) {
for (BeamFnDataInboundObserver inboundObserver : inboundObservers) {
inboundObserver.awaitCompletion();
}
waitForInboundElements.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
* <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output
* location with a specific outbound observer.
*/
public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer2.class);
public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class);
private final Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor;
private final StreamObserver<BeamFnApi.Elements> inboundObserver;
private final StreamObserver<BeamFnApi.Elements> outboundObserver;
Expand All @@ -57,7 +57,7 @@ public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable {
receivers;
private final ConcurrentMap<String, Boolean> erroredInstructionIds;

public BeamFnDataGrpcMultiplexer2(
public BeamFnDataGrpcMultiplexer(
Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor,
OutboundObserverFactory outboundObserverFactory,
OutboundObserverFactory.BasicFactory<BeamFnApi.Elements, BeamFnApi.Elements>
Expand Down Expand Up @@ -95,7 +95,7 @@ private CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverF
/**
* Registers a consumer for the specified intruction id.
*
* <p>The {@link BeamFnDataGrpcMultiplexer2} partitions {@link BeamFnApi.Elements} with multiple
* <p>The {@link BeamFnDataGrpcMultiplexer} partitions {@link BeamFnApi.Elements} with multiple
* instruction ids ensuring that the receiver will only see {@link BeamFnApi.Elements} with a
* single instruction id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@
*
* <p>Closing the receiver will unblock any upstream producer and downstream consumer exceptionally.
*/
public class BeamFnDataInboundObserver2 implements CloseableFnDataReceiver<BeamFnApi.Elements> {
public class BeamFnDataInboundObserver implements CloseableFnDataReceiver<BeamFnApi.Elements> {

/**
* Creates a receiver that is able to consume elements multiplexing on to the provided set of
* endpoints.
*/
public static BeamFnDataInboundObserver2 forConsumers(
public static BeamFnDataInboundObserver forConsumers(
List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) {
return new BeamFnDataInboundObserver2(dataEndpoints, timerEndpoints);
return new BeamFnDataInboundObserver(dataEndpoints, timerEndpoints);
}

/** Holds the status of whether the endpoint has been completed or not. */
Expand Down Expand Up @@ -78,7 +78,7 @@ private CloseException() {
private final int totalNumEndpoints;
private int numEndpointsThatAreIncomplete;

private BeamFnDataInboundObserver2(
private BeamFnDataInboundObserver(
List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) {
this.transformIdToDataEndpoint = new HashMap<>();
for (DataEndpoint<?> endpoint : dataEndpoints) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.junit.Rule;
import org.junit.Test;

/** Tests for {@link BeamFnDataGrpcMultiplexer2}. */
public class BeamFnDataGrpcMultiplexer2Test {
/** Tests for {@link BeamFnDataGrpcMultiplexer}. */
public class BeamFnDataGrpcMultiplexerTest {

private static final Endpoints.ApiServiceDescriptor DESCRIPTOR =
Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
Expand Down Expand Up @@ -83,8 +83,8 @@ public class BeamFnDataGrpcMultiplexer2Test {
@Test
public void testOutboundObserver() {
Collection<BeamFnApi.Elements> values = new ArrayList<>();
BeamFnDataGrpcMultiplexer2 multiplexer =
new BeamFnDataGrpcMultiplexer2(
BeamFnDataGrpcMultiplexer multiplexer =
new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver -> TestStreams.withOnNext(values::add).build());
Expand All @@ -97,8 +97,8 @@ public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
BeamFnDataGrpcMultiplexer2 multiplexer =
new BeamFnDataGrpcMultiplexer2(
BeamFnDataGrpcMultiplexer multiplexer =
new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
Expand Down Expand Up @@ -176,8 +176,8 @@ public void testElementsNeedsPartitioning() throws Exception {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> timerInboundValues = new ArrayList<>();
BeamFnDataGrpcMultiplexer2 multiplexer =
new BeamFnDataGrpcMultiplexer2(
BeamFnDataGrpcMultiplexer multiplexer =
new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
Expand Down Expand Up @@ -238,8 +238,8 @@ public void accept(BeamFnApi.Elements input) throws Exception {
public void testElementsWithOnlySingleInstructionIdUsingHotPath() throws Exception {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
BeamFnDataGrpcMultiplexer2 multiplexer =
new BeamFnDataGrpcMultiplexer2(
BeamFnDataGrpcMultiplexer multiplexer =
new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
Expand Down Expand Up @@ -274,8 +274,8 @@ public void accept(BeamFnApi.Elements input) throws Exception {
public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() throws Exception {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements> dataInboundValues = new ArrayList<>();
BeamFnDataGrpcMultiplexer2 multiplexer =
new BeamFnDataGrpcMultiplexer2(
BeamFnDataGrpcMultiplexer multiplexer =
new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
Expand Down Expand Up @@ -326,8 +326,8 @@ public void testClose() throws Exception {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<Throwable> errorWasReturned = new ArrayList<>();
AtomicBoolean wasClosed = new AtomicBoolean();
final BeamFnDataGrpcMultiplexer2 multiplexer =
new BeamFnDataGrpcMultiplexer2(
final BeamFnDataGrpcMultiplexer multiplexer =
new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link BeamFnDataInboundObserver2}. */
/** Tests for {@link BeamFnDataInboundObserver}. */
@RunWith(JUnit4.class)
public class BeamFnDataInboundObserver2Test {
public class BeamFnDataInboundObserverTest {
private static final Coder<WindowedValue<String>> CODER =
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
private static final String TRANSFORM_ID = "transformId";
Expand All @@ -58,8 +58,8 @@ public void testConsumptionOfValuesHappensOnAwaitCompletionCallersThread() throw
Thread thread = Thread.currentThread();
Collection<WindowedValue<String>> values = new ArrayList<>();
Collection<WindowedValue<String>> timers = new ArrayList<>();
BeamFnDataInboundObserver2 observer =
BeamFnDataInboundObserver2.forConsumers(
BeamFnDataInboundObserver observer =
BeamFnDataInboundObserver.forConsumers(
Arrays.asList(
DataEndpoint.create(
TRANSFORM_ID,
Expand Down Expand Up @@ -102,8 +102,8 @@ public void testConsumptionOfValuesHappensOnAwaitCompletionCallersThread() throw
@Test
public void testAwaitCompletionFailureVisibleToAwaitCompletionCallerAndProducer()
throws Exception {
BeamFnDataInboundObserver2 observer =
BeamFnDataInboundObserver2.forConsumers(
BeamFnDataInboundObserver observer =
BeamFnDataInboundObserver.forConsumers(
Arrays.asList(
DataEndpoint.create(
TRANSFORM_ID,
Expand Down Expand Up @@ -137,8 +137,8 @@ public void testAwaitCompletionFailureVisibleToAwaitCompletionCallerAndProducer(

@Test
public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exception {
BeamFnDataInboundObserver2 observer =
BeamFnDataInboundObserver2.forConsumers(
BeamFnDataInboundObserver observer =
BeamFnDataInboundObserver.forConsumers(
Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})),
Collections.emptyList());

Expand All @@ -147,7 +147,7 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio
() -> {
observer.accept(dataWith("ABC"));
assertThrows(
BeamFnDataInboundObserver2.CloseException.class,
BeamFnDataInboundObserver.CloseException.class,
() -> {
while (true) {
// keep trying to send messages since the queue buffers messages and the
Expand All @@ -165,16 +165,16 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio
return null;
});

assertThrows(BeamFnDataInboundObserver2.CloseException.class, () -> observer.awaitCompletion());
assertThrows(BeamFnDataInboundObserver.CloseException.class, () -> observer.awaitCompletion());
future.get();
future2.get();
}

@Test
public void testBadProducerDataFailureVisibleToAwaitCompletionCallerAndProducer()
throws Exception {
BeamFnDataInboundObserver2 observer =
BeamFnDataInboundObserver2.forConsumers(
BeamFnDataInboundObserver observer =
BeamFnDataInboundObserver.forConsumers(
Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})),
Collections.emptyList());
Future<?> future =
Expand Down
Loading

0 comments on commit 299a7b0

Please sign in to comment.