Skip to content

Commit

Permalink
Enable the RequestScope on gRPC service invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Jan 4, 2021
1 parent 75510b1 commit 1b8190e
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.grpc.testing.integration.Messages;
import io.grpc.testing.integration.MutinyTestServiceGrpc;
import io.grpc.testing.integration.TestServiceGrpc;
import io.quarkus.grpc.server.services.AssertHelper;
import io.quarkus.grpc.server.services.MutinyHelloService;
import io.quarkus.grpc.server.services.MutinyTestService;
import io.quarkus.test.QuarkusUnitTest;
Expand All @@ -28,7 +29,7 @@ public class MutinyGrpcServiceWithPlainTextTest extends GrpcServiceTestBase {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(MutinyHelloService.class, MutinyTestService.class,
.addClasses(MutinyHelloService.class, MutinyTestService.class, AssertHelper.class,
GreeterGrpc.class, HelloRequest.class, HelloReply.class, MutinyGreeterGrpc.class,
HelloRequestOrBuilder.class, HelloReplyOrBuilder.class,
EmptyProtos.class, Messages.class, MutinyTestServiceGrpc.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.grpc.testing.integration.MutinyTestServiceGrpc;
import io.grpc.testing.integration.TestServiceGrpc;
import io.netty.handler.ssl.SslContext;
import io.quarkus.grpc.server.services.AssertHelper;
import io.quarkus.grpc.server.services.MutinyHelloService;
import io.quarkus.grpc.server.services.MutinyTestService;
import io.quarkus.test.QuarkusUnitTest;
Expand All @@ -38,7 +39,7 @@ public class MutinyGrpcServiceWithSSLTest extends GrpcServiceTestBase {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(MutinyHelloService.class, MutinyTestService.class,
.addClasses(MutinyHelloService.class, MutinyTestService.class, AssertHelper.class,
GreeterGrpc.class, HelloRequest.class, HelloReply.class, MutinyGreeterGrpc.class,
HelloRequestOrBuilder.class, HelloReplyOrBuilder.class,
EmptyProtos.class, Messages.class, MutinyTestServiceGrpc.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.grpc.testing.integration.Messages;
import io.grpc.testing.integration.MutinyTestServiceGrpc;
import io.grpc.testing.integration.TestServiceGrpc;
import io.quarkus.grpc.server.services.AssertHelper;
import io.quarkus.grpc.server.services.HelloService;
import io.quarkus.grpc.server.services.TestService;
import io.quarkus.test.QuarkusUnitTest;
Expand All @@ -28,7 +29,7 @@ public class RegularGrpcServiceWithPlainTextTest extends GrpcServiceTestBase {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HelloService.class, TestService.class,
.addClasses(HelloService.class, TestService.class, AssertHelper.class,
GreeterGrpc.class, HelloRequest.class, HelloReply.class, MutinyGreeterGrpc.class,
HelloRequestOrBuilder.class, HelloReplyOrBuilder.class,
EmptyProtos.class, Messages.class, MutinyTestServiceGrpc.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.grpc.testing.integration.MutinyTestServiceGrpc;
import io.grpc.testing.integration.TestServiceGrpc;
import io.netty.handler.ssl.SslContext;
import io.quarkus.grpc.server.services.AssertHelper;
import io.quarkus.grpc.server.services.HelloService;
import io.quarkus.grpc.server.services.TestService;
import io.quarkus.test.QuarkusUnitTest;
Expand All @@ -39,7 +40,7 @@ public class RegularGrpcServiceWithSSLFromClasspathTest extends GrpcServiceTestB
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HelloService.class, TestService.class,
.addClasses(HelloService.class, TestService.class, AssertHelper.class,
GreeterGrpc.class, HelloRequest.class, HelloReply.class, MutinyGreeterGrpc.class,
HelloRequestOrBuilder.class, HelloReplyOrBuilder.class,
EmptyProtos.class, Messages.class, MutinyTestServiceGrpc.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.grpc.testing.integration.MutinyTestServiceGrpc;
import io.grpc.testing.integration.TestServiceGrpc;
import io.netty.handler.ssl.SslContext;
import io.quarkus.grpc.server.services.AssertHelper;
import io.quarkus.grpc.server.services.HelloService;
import io.quarkus.grpc.server.services.TestService;
import io.quarkus.test.QuarkusUnitTest;
Expand All @@ -38,7 +39,7 @@ public class RegularGrpcServiceWithSSLTest extends GrpcServiceTestBase {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HelloService.class, TestService.class,
.addClasses(HelloService.class, TestService.class, AssertHelper.class,
GreeterGrpc.class, HelloRequest.class, HelloReply.class, MutinyGreeterGrpc.class,
HelloRequestOrBuilder.class, HelloReplyOrBuilder.class,
EmptyProtos.class, Messages.class, MutinyTestServiceGrpc.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.testing.integration.TestServiceGrpc;
import io.quarkus.grpc.runtime.annotations.GrpcService;
import io.quarkus.grpc.runtime.health.GrpcHealthStorage;
import io.quarkus.grpc.server.services.AssertHelper;
import io.quarkus.grpc.server.services.BlockingMutinyHelloService;
import io.quarkus.grpc.server.services.TestService;
import io.quarkus.test.QuarkusUnitTest;
Expand All @@ -41,7 +42,7 @@ public class BlockingAndNonBlockingTest {
.addPackage(GreeterGrpc.class.getPackage())
.addPackage(TestServiceGrpc.class.getPackage())
.addPackage(EmptyProtos.class.getPackage())
.addClasses(BlockingMutinyHelloService.class, TestService.class))
.addClasses(BlockingMutinyHelloService.class, TestService.class, AssertHelper.class))
.withConfigurationResource("blocking-config.properties");

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.quarkus.grpc.blocking.BlockingTestServiceGrpc;
import io.quarkus.grpc.blocking.MutinyBlockingTestServiceGrpc;
import io.quarkus.grpc.runtime.annotations.GrpcService;
import io.quarkus.grpc.server.services.AssertHelper;
import io.quarkus.grpc.server.services.BlockingTestService;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;
Expand All @@ -37,7 +38,7 @@ public class BlockingMethodsTest {
.addPackage(EmptyProtos.class.getPackage())
.addPackage(Messages.class.getPackage())
.addPackage(BlockingTestServiceGrpc.class.getPackage())
.addClasses(BlockingTestService.class))
.addClasses(BlockingTestService.class, AssertHelper.class))
.withConfigurationResource("blocking-test-config.properties");

protected static final Duration TIMEOUT = Duration.ofSeconds(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.quarkus.grpc.blocking.BlockingTestServiceGrpc;
import io.quarkus.grpc.blocking.MutinyBlockingTestServiceGrpc;
import io.quarkus.grpc.runtime.annotations.GrpcService;
import io.quarkus.grpc.server.services.AssertHelper;
import io.quarkus.grpc.server.services.BlockingMutinyTestService;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;
Expand All @@ -37,7 +38,7 @@ public class BlockingMethodsWithMutinyImplTest {
.addPackage(EmptyProtos.class.getPackage())
.addPackage(Messages.class.getPackage())
.addPackage(BlockingTestServiceGrpc.class.getPackage())
.addClasses(BlockingMutinyTestService.class))
.addClasses(BlockingMutinyTestService.class, AssertHelper.class))
.withConfigurationResource("blocking-test-config.properties");

protected static final Duration TIMEOUT = Duration.ofSeconds(5);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.quarkus.grpc.server.services;

import static org.assertj.core.api.Assertions.assertThat;

import io.quarkus.arc.Arc;
import io.vertx.core.Vertx;

public class AssertHelper {

public static void assertThatTheRequestScopeIsActive() {
assertThat(Arc.container().requestContext().isActive()).isTrue();
}

public static void assertRunOnEventLoop() {
assertThat(Vertx.currentContext()).isNotNull();
assertThat(Vertx.currentContext().isEventLoopContext());
assertThat(Thread.currentThread().getName()).contains("eventloop");
}

public static void assertRunOnWorker() {
assertThat(Vertx.currentContext()).isNotNull();
assertThat(Thread.currentThread().getName()).contains("worker");
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.grpc.server.services;

import static io.quarkus.grpc.server.services.AssertHelper.assertRunOnEventLoop;
import static io.quarkus.grpc.server.services.AssertHelper.assertRunOnWorker;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -14,23 +16,11 @@
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;

@Singleton
public class BlockingMutinyTestService
extends MutinyBlockingTestServiceGrpc.BlockingTestServiceImplBase {

private void assertRunOnEventLoop() {
assertThat(Vertx.currentContext()).isNotNull();
assertThat(Vertx.currentContext().isEventLoopContext()).isTrue();
assertThat(Thread.currentThread().getName()).contains("eventloop-thread");
}

private void assertRunOnWorker() {
assertThat(Vertx.currentContext()).isNotNull();
assertThat(Thread.currentThread().getName()).contains("worker");
}

@Override
public Uni<EmptyProtos.Empty> emptyCall(EmptyProtos.Empty request) {
assertThat(request).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.grpc.server.services;

import static io.quarkus.grpc.server.services.AssertHelper.assertRunOnEventLoop;
import static io.quarkus.grpc.server.services.AssertHelper.assertRunOnWorker;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
Expand All @@ -15,22 +17,10 @@
import io.grpc.testing.integration.Messages;
import io.quarkus.grpc.blocking.BlockingTestServiceGrpc;
import io.smallrye.common.annotation.Blocking;
import io.vertx.core.Vertx;

@Singleton
public class BlockingTestService extends BlockingTestServiceGrpc.BlockingTestServiceImplBase {

private void assertRunOnEventLoop() {
assertThat(Vertx.currentContext()).isNotNull();
assertThat(Vertx.currentContext().isEventLoopContext()).isTrue();
assertThat(Thread.currentThread().getName()).contains("eventloop");
}

private void assertRunOnWorker() {
assertThat(Vertx.currentContext()).isNotNull();
assertThat(Thread.currentThread().getName()).contains("worker");
}

@Override
public void emptyCall(EmptyProtos.Empty request, StreamObserver<EmptyProtos.Empty> responseObserver) {
assertThat(request).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.grpc.server.services;

import static io.quarkus.grpc.server.services.AssertHelper.assertThatTheRequestScopeIsActive;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -20,57 +21,66 @@ public class MutinyTestService extends MutinyTestServiceGrpc.TestServiceImplBase
@Override
public Uni<EmptyProtos.Empty> emptyCall(EmptyProtos.Empty request) {
assertThat(request).isNotNull();
assertThatTheRequestScopeIsActive();
return Uni.createFrom().item(EmptyProtos.Empty.newBuilder().build());
}

@Override
public Uni<Messages.SimpleResponse> unaryCall(Messages.SimpleRequest request) {
assertThat(request).isNotNull();
assertThatTheRequestScopeIsActive();
return Uni.createFrom().item(Messages.SimpleResponse.newBuilder().build());
}

@Override
public Multi<Messages.StreamingOutputCallResponse> streamingOutputCall(
Messages.StreamingOutputCallRequest request) {
assertThat(request).isNotNull();
assertThatTheRequestScopeIsActive();
return Multi.createFrom().range(0, 10)
.map(i -> ByteString.copyFromUtf8(Integer.toString(i)))
.map(s -> Messages.Payload.newBuilder().setBody(s).build())
.map(p -> Messages.StreamingOutputCallResponse.newBuilder().setPayload(p).build());
.map(p -> Messages.StreamingOutputCallResponse.newBuilder().setPayload(p).build())
.onItem().invoke(() -> assertThatTheRequestScopeIsActive());
}

@Override
public Uni<Messages.StreamingInputCallResponse> streamingInputCall(
Multi<Messages.StreamingInputCallRequest> request) {
assertThatTheRequestScopeIsActive();
return request.map(i -> i.getPayload().getBody().toStringUtf8())
.collectItems().asList()
.map(list -> {
assertThat(list).containsExactly("a", "b", "c", "d");
return Messages.StreamingInputCallResponse.newBuilder().build();
});
})
.onItem().invoke(() -> assertThatTheRequestScopeIsActive());
}

@Override
public Multi<Messages.StreamingOutputCallResponse> fullDuplexCall(
Multi<Messages.StreamingOutputCallRequest> request) {
assertThatTheRequestScopeIsActive();
AtomicInteger counter = new AtomicInteger();
return request
.map(r -> r.getPayload().getBody().toStringUtf8())
.map(r -> r + counter.incrementAndGet())
.map(r -> Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(r)).build())
.map(r -> Messages.StreamingOutputCallResponse.newBuilder().setPayload(r).build());
.map(r -> Messages.StreamingOutputCallResponse.newBuilder().setPayload(r).build())
.onItem().invoke(() -> assertThatTheRequestScopeIsActive());
}

@Override
public Multi<Messages.StreamingOutputCallResponse> halfDuplexCall(
Multi<Messages.StreamingOutputCallRequest> request) {
assertThatTheRequestScopeIsActive();
return request
.map(r -> r.getPayload().getBody().toStringUtf8())
.map(String::toUpperCase)
.collectItems().asList()
.onItem().transformToMulti(s -> Multi.createFrom().iterable(s))
.map(r -> Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(r)).build())
.map(r -> Messages.StreamingOutputCallResponse.newBuilder().setPayload(r).build());

.map(r -> Messages.StreamingOutputCallResponse.newBuilder().setPayload(r).build())
.onItem().invoke(() -> assertThatTheRequestScopeIsActive());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.grpc.server.services;

import static io.quarkus.grpc.server.services.AssertHelper.assertThatTheRequestScopeIsActive;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
Expand All @@ -20,6 +21,7 @@ public class TestService extends TestServiceGrpc.TestServiceImplBase {

@Override
public void emptyCall(EmptyProtos.Empty request, StreamObserver<EmptyProtos.Empty> responseObserver) {
assertThatTheRequestScopeIsActive();
assertThat(request).isNotNull();
responseObserver.onNext(EmptyProtos.Empty.newBuilder().build());
responseObserver.onCompleted();
Expand All @@ -28,6 +30,7 @@ public void emptyCall(EmptyProtos.Empty request, StreamObserver<EmptyProtos.Empt
@Override
public void unaryCall(Messages.SimpleRequest request,
StreamObserver<Messages.SimpleResponse> responseObserver) {
assertThatTheRequestScopeIsActive();
assertThat(request).isNotNull();
responseObserver.onNext(Messages.SimpleResponse.newBuilder().build());
responseObserver.onCompleted();
Expand All @@ -36,18 +39,27 @@ public void unaryCall(Messages.SimpleRequest request,
@Override
public void streamingOutputCall(Messages.StreamingOutputCallRequest request,
StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
assertThatTheRequestScopeIsActive();
assertThat(request).isNotNull();
for (int i = 0; i < 10; i++) {
ByteString value = ByteString.copyFromUtf8(Integer.toString(i));
Messages.Payload payload = Messages.Payload.newBuilder().setBody(value).build();
responseObserver.onNext(Messages.StreamingOutputCallResponse.newBuilder().setPayload(payload).build());
}
responseObserver.onCompleted();
// Send the completion signal on another thread.
new Thread(new Runnable() {
@Override
public void run() {
responseObserver.onCompleted();
}
}).start();

}

@Override
public StreamObserver<Messages.StreamingInputCallRequest> streamingInputCall(
StreamObserver<Messages.StreamingInputCallResponse> responseObserver) {
assertThatTheRequestScopeIsActive();
List<String> list = new CopyOnWriteArrayList<>();
return new StreamObserver<Messages.StreamingInputCallRequest>() {
@Override
Expand All @@ -64,6 +76,7 @@ public void onError(Throwable throwable) {
public void onCompleted() {
assertThat(list).containsExactly("a", "b", "c", "d");
responseObserver.onNext(Messages.StreamingInputCallResponse.newBuilder().build());
assertThatTheRequestScopeIsActive();
responseObserver.onCompleted();
}
};
Expand All @@ -74,6 +87,7 @@ public void onCompleted() {
@Override
public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall(
StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
assertThatTheRequestScopeIsActive();
AtomicInteger counter = new AtomicInteger();
return new StreamObserver<Messages.StreamingOutputCallRequest>() {
@Override
Expand All @@ -94,6 +108,7 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
assertThatTheRequestScopeIsActive();
responseObserver.onCompleted();
}
};
Expand All @@ -102,7 +117,7 @@ public void onCompleted() {
@Override
public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall(
StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {

assertThatTheRequestScopeIsActive();
List<Messages.StreamingOutputCallResponse> list = new CopyOnWriteArrayList<>();
return new StreamObserver<Messages.StreamingOutputCallRequest>() {
@Override
Expand All @@ -122,6 +137,7 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
assertThatTheRequestScopeIsActive();
list.forEach(responseObserver::onNext);
responseObserver.onCompleted();
}
Expand Down
Loading

0 comments on commit 1b8190e

Please sign in to comment.