Skip to content

Commit

Permalink
[FLINK-29375][rpc] Move getSelfGateway() into RpcService
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored Sep 23, 2022
1 parent 24c685a commit 0154de9
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,20 @@ public int getPort() {
return port;
}

public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer) {
if (selfGatewayType.isInstance(rpcServer)) {
@SuppressWarnings("unchecked")
C selfGateway = ((C) rpcServer);

return selfGateway;
} else {
throw new ClassCastException(
"RpcEndpoint does not implement the RpcGateway interface of type "
+ selfGatewayType
+ '.');
}
}

// this method does not mutate state and is thus thread-safe
@Override
public <C extends RpcGateway> CompletableFuture<C> connect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,17 +308,7 @@ public final CompletableFuture<Void> closeAsync() {
* @return Self gateway of the specified type which can be used to issue asynchronous rpcs
*/
public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType) {
if (selfGatewayType.isInstance(rpcServer)) {
@SuppressWarnings("unchecked")
C selfGateway = ((C) rpcServer);

return selfGateway;
} else {
throw new RuntimeException(
"RpcEndpoint does not implement the RpcGateway interface of type "
+ selfGatewayType
+ '.');
}
return rpcService.getSelfGateway(selfGatewayType, rpcServer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ public interface RpcService {
*/
int getPort();

/**
* Returns a self gateway of the specified type which can be used to issue asynchronous calls
* against the RpcEndpoint.
*
* <p>IMPORTANT: The self gateway type must be implemented by the RpcEndpoint. Otherwise the
* method will fail.
*
* @param <C> type of the self gateway to create
* @param selfGatewayType class of the self gateway type
* @return Self gateway of the specified type which can be used to issue asynchronous rpcs
*/
<C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer);

/**
* Connect to a remote rpc server under the provided address. Returns a rpc gateway which can be
* used to communicate with the rpc server. If the connection failed, then the returned future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ public int getPort() {
return backingRpcService.getPort();
}

@Override
public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer) {
return backingRpcService.getSelfGateway(selfGatewayType, rpcServer);
}

@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
return backingRpcService.startServer(rpcEndpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,12 @@ public int getPort() {
return rpcService.getPort();
}

@Override
public <C extends RpcGateway> C getSelfGateway(
Class<C> selfGatewayType, RpcServer rpcServer) {
return rpcService.getSelfGateway(selfGatewayType, rpcServer);
}

@Override
public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
final CompletableFuture<C> future = rpcService.connect(address, clazz);
Expand Down

0 comments on commit 0154de9

Please sign in to comment.