Skip to content

Commit

Permalink
Merge pull request apache#4224
Browse files Browse the repository at this point in the history
Move BeamFnDataGrpcMultiplexer to java-fn-execution
  • Loading branch information
tgroh authored Jan 17, 2018
2 parents 8751ea5 + d174ce7 commit 4da548e
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 49 deletions.
2 changes: 1 addition & 1 deletion runners/java-fn-execution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

apply from: project(":").file("build_rules.gradle")
applyJavaNature()
applyJavaNature(javaVersion: 1.8)

description = "Apache Beam :: Runners :: Java Fn Execution"

Expand Down
14 changes: 14 additions & 0 deletions runners/java-fn-execution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@

<packaging>jar</packaging>

<build>
<plugins>
<plugin>
<!-- Override Beam parent to allow Java8 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
Expand Down
3 changes: 2 additions & 1 deletion sdks/java/fn-execution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

apply from: project(":").file("build_rules.gradle")
applyJavaNature()
applyJavaNature(javaVersion: 1.8)

description = "Apache Beam :: SDKs :: Java :: Fn Execution"

Expand All @@ -37,6 +37,7 @@ dependencies {
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
testCompile library.java.mockito_core
testCompile library.java.slf4j_jdk14
}

task packageTests(type: Jar) {
Expand Down
14 changes: 14 additions & 0 deletions sdks/java/fn-execution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@

<packaging>jar</packaging>

<build>
<plugins>
<plugin>
<!-- Override Beam parent to allow Java8 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.fn.harness.data;
package org.apache.beam.sdk.fn.data;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand All @@ -25,16 +25,16 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements.Data;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.StreamObserverFactory.StreamObserverClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A gRPC multiplexer for a specific {@link
* Endpoints.ApiServiceDescriptor}.
* A gRPC multiplexer for a specific {@link Endpoints.ApiServiceDescriptor}.
*
* <p>Multiplexes data for inbound consumers based upon their individual {@link
* org.apache.beam.model.fnexecution.v1.BeamFnApi.Target}s.
Expand All @@ -48,27 +48,26 @@
*/
public class BeamFnDataGrpcMultiplexer {
private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class);
private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
@Nullable private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
private final StreamObserver<BeamFnApi.Elements> inboundObserver;
private final StreamObserver<BeamFnApi.Elements> outboundObserver;
@VisibleForTesting
final ConcurrentMap<
LogicalEndpoint, CompletableFuture<Consumer<BeamFnApi.Elements.Data>>>
private final ConcurrentMap<
LogicalEndpoint, CompletableFuture<Consumer<BeamFnApi.Elements.Data>>>
consumers;

public BeamFnDataGrpcMultiplexer(
Endpoints.ApiServiceDescriptor apiServiceDescriptor,
Function<StreamObserver<BeamFnApi.Elements>,
StreamObserver<BeamFnApi.Elements>> outboundObserverFactory) {
@Nullable Endpoints.ApiServiceDescriptor apiServiceDescriptor,
StreamObserverClientFactory<BeamFnApi.Elements, BeamFnApi.Elements> outboundObserverFactory) {
this.apiServiceDescriptor = apiServiceDescriptor;
this.consumers = new ConcurrentHashMap<>();
this.inboundObserver = new InboundObserver();
this.outboundObserver = outboundObserverFactory.apply(inboundObserver);
this.outboundObserver = outboundObserverFactory.outboundObserverFor(inboundObserver);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("apiServiceDescriptor", apiServiceDescriptor)
.add("consumers", consumers)
.toString();
Expand All @@ -82,14 +81,24 @@ public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
return outboundObserver;
}

public CompletableFuture<Consumer<BeamFnApi.Elements.Data>> futureForKey(
LogicalEndpoint key) {
return consumers.computeIfAbsent(key, (LogicalEndpoint unused) -> new CompletableFuture<>());
private CompletableFuture<Consumer<Data>> receiverFuture(LogicalEndpoint endpoint) {
return consumers.computeIfAbsent(
endpoint, (LogicalEndpoint unused) -> new CompletableFuture<>());
}

public void registerConsumer(
LogicalEndpoint inputLocation, Consumer<BeamFnApi.Elements.Data> dataBytesReceiver) {
receiverFuture(inputLocation).complete(dataBytesReceiver);
}

@VisibleForTesting
boolean hasConsumer(LogicalEndpoint outputLocation) {
return consumers.containsKey(outputLocation);
}

/**
* A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to pass
* the elements to.
* A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to
* pass the elements to.
*
* <p>The inbound observer blocks until the {@link Consumer} is bound allowing for the
* sending harness to initiate transmitting data without needing for the receiving harness to
Expand All @@ -102,7 +111,7 @@ public void onNext(BeamFnApi.Elements value) {
try {
LogicalEndpoint key =
LogicalEndpoint.of(data.getInstructionReference(), data.getTarget());
CompletableFuture<Consumer<BeamFnApi.Elements.Data>> consumer = futureForKey(key);
CompletableFuture<Consumer<BeamFnApi.Elements.Data>> consumer = receiverFuture(key);
if (!consumer.isDone()) {
LOG.debug("Received data for key {} without consumer ready. "
+ "Waiting for consumer to be registered.", key);
Expand Down Expand Up @@ -135,12 +144,17 @@ public void onNext(BeamFnApi.Elements value) {

@Override
public void onError(Throwable t) {
LOG.error("Failed to handle for {}", apiServiceDescriptor, t);
LOG.error(
"Failed to handle for {}",
apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor,
t);
}

@Override
public void onCompleted() {
LOG.warn("Hanged up for {}.", apiServiceDescriptor);
LOG.warn(
"Hanged up for {}.",
apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.fn.harness.data;
package org.apache.beam.sdk.fn.data;

import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertFalse;
Expand All @@ -24,15 +24,13 @@

import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.junit.Test;

Expand Down Expand Up @@ -61,37 +59,33 @@ public class BeamFnDataGrpcMultiplexerTest {

@Test
public void testOutboundObserver() {
Collection<BeamFnApi.Elements> values = new ArrayList<>();
BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
(StreamObserver<BeamFnApi.Elements> inboundObserver)
-> TestStreams.withOnNext(values::add).build());
final Collection<BeamFnApi.Elements> values = new ArrayList<>();
BeamFnDataGrpcMultiplexer multiplexer =
new BeamFnDataGrpcMultiplexer(
DESCRIPTOR, inboundObserver -> TestStreams.withOnNext(values::add).build());
multiplexer.getOutboundObserver().onNext(ELEMENTS);
assertThat(values, contains(ELEMENTS));
}

@Test
public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
Collection<BeamFnApi.Elements.Data> inboundValues = new ArrayList<>();
BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(
DESCRIPTOR,
(StreamObserver<BeamFnApi.Elements> inboundObserver)
-> TestStreams.withOnNext(outboundValues::add).build());
final Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
final Collection<BeamFnApi.Elements.Data> inboundValues = new ArrayList<>();
final BeamFnDataGrpcMultiplexer multiplexer =
new BeamFnDataGrpcMultiplexer(
DESCRIPTOR, inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Runnable() {
@Override
public void run() {
// Purposefully sleep to simulate a delay in a consumer connecting.
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
multiplexer.futureForKey(OUTPUT_LOCATION).complete(inboundValues::add);
}
});
executorService.submit(
() -> {
// Purposefully sleep to simulate a delay in a consumer connecting.
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
multiplexer.registerConsumer(OUTPUT_LOCATION, inboundValues::add);
});
multiplexer.getInboundObserver().onNext(ELEMENTS);
assertTrue(multiplexer.consumers.containsKey(OUTPUT_LOCATION));
assertTrue(multiplexer.hasConsumer(OUTPUT_LOCATION));
// Ensure that when we see a terminal Elements object, we remove the consumer
multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
assertFalse(multiplexer.consumers.containsKey(OUTPUT_LOCATION));
assertFalse(multiplexer.hasConsumer(OUTPUT_LOCATION));

// Assert that normal and terminal Elements are passed to the consumer
assertThat(inboundValues, contains(ELEMENTS.getData(0), TERMINAL_ELEMENTS.getData(0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
Expand Down Expand Up @@ -87,8 +88,8 @@ public <T> CompletableFuture<Void> receive(

CompletableFuture<Void> readFuture = new CompletableFuture<>();
BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor);
client.futureForKey(inputLocation).complete(
new BeamFnDataInboundObserver<>(coder, consumer, readFuture));
client.registerConsumer(
inputLocation, new BeamFnDataInboundObserver<>(coder, consumer, readFuture));
return readFuture;
}

Expand Down

0 comments on commit 4da548e

Please sign in to comment.