Skip to content

Commit

Permalink
Fix PushAckRequest mismatch problem. (alibaba#7179)
Browse files Browse the repository at this point in the history
* fix PushAckRequest mismatch problem.

* add doc.

* add doc.
  • Loading branch information
horizonzy authored Nov 8, 2021
1 parent cc568a6 commit 1778a84
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public String toString() {
*/
public static final int SERVER_ERROR = 500;

/**
* client error(client异常,返回给服务端).
*/
public static final int CLIENT_ERROR = -500;

/**
* bad gateway(路由异常,如nginx后面的Server挂掉).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,17 @@
*/
public class ErrorResponse extends Response {

/**
* build an error response.
*
* @param errorCode errorCode
* @param msg msg
* @return response
*/
public static Response build(int errorCode, String msg) {
ErrorResponse response = new ErrorResponse();
response.setErrorInfo(errorCode, msg);
return response;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.request.PushAckRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.ServerCheckRequest;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ServerCheckResponse;
import com.alibaba.nacos.common.remote.ConnectionType;
Expand Down Expand Up @@ -181,7 +181,10 @@ public void onNext(Payload payload) {
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}",
grpcConn.getConnectionId(), payload.toString(), e.getMessage());
sendResponse(request.getRequestId(), false);
Response errResponse = ErrorResponse
.build(NacosException.CLIENT_ERROR, "Handle server request error");
errResponse.setRequestId(request.getRequestId());
sendResponse(errResponse);
}

}
Expand Down Expand Up @@ -231,15 +234,6 @@ public void onCompleted() {
});
}

private void sendResponse(String ackId, boolean success) {
try {
PushAckRequest request = PushAckRequest.build(ackId, success);
this.currentConnection.request(request, 3000L);
} catch (Exception e) {
LOGGER.error("[{}]Error to send ack response, ackId->{}", this.currentConnection.getConnectionId(), ackId);
}
}

private void sendResponse(Response response) {
try {
((GrpcConnection) this.currentConnection).sendResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void request(Payload grpcRequest, StreamObserver<Payload> responseObserve
//server is on starting.
if (!ApplicationUtils.isStarted()) {
Payload payloadResponse = GrpcUtils.convert(
buildErrorResponse(NacosException.INVALID_SERVER_STATUS, "Server is starting,please try later."));
ErrorResponse.build(NacosException.INVALID_SERVER_STATUS, "Server is starting,please try later."));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);

Expand All @@ -101,7 +101,7 @@ public void request(Payload grpcRequest, StreamObserver<Payload> responseObserve
if (requestHandler == null) {
Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
Payload payloadResponse = GrpcUtils
.convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
.convert(ErrorResponse.build(NacosException.NO_HANDLER, "RequestHandler Not Found"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
Expand All @@ -115,7 +115,7 @@ public void request(Payload grpcRequest, StreamObserver<Payload> responseObserve
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
Payload payloadResponse = GrpcUtils
.convert(buildErrorResponse(NacosException.UN_REGISTER, "Connection is unregistered."));
.convert(ErrorResponse.build(NacosException.UN_REGISTER, "Connection is unregistered."));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
Expand All @@ -128,7 +128,7 @@ public void request(Payload grpcRequest, StreamObserver<Payload> responseObserve
} catch (Exception e) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
Expand All @@ -138,7 +138,7 @@ public void request(Payload grpcRequest, StreamObserver<Payload> responseObserve
if (parseObj == null) {
Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive ,parse request is null", connectionId);
Payload payloadResponse = GrpcUtils
.convert(buildErrorResponse(NacosException.BAD_GATEWAY, "Invalid request"));
.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
Expand All @@ -149,7 +149,7 @@ public void request(Payload grpcRequest, StreamObserver<Payload> responseObserve
.warn("[{}] Invalid request receive ,parsed payload is not a request,parseObj={}", connectionId,
parseObj);
Payload payloadResponse = GrpcUtils
.convert(buildErrorResponse(NacosException.BAD_GATEWAY, "Invalid request"));
.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
Expand All @@ -174,7 +174,7 @@ public void request(Payload grpcRequest, StreamObserver<Payload> responseObserve
Loggers.REMOTE_DIGEST
.error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId,
e);
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(
(e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
e.getMessage()));
traceIfNecessary(payloadResponse, false);
Expand All @@ -184,9 +184,4 @@ public void request(Payload grpcRequest, StreamObserver<Payload> responseObserve

}

private Response buildErrorResponse(int errorCode, String msg) {
ErrorResponse response = new ErrorResponse();
response.setErrorInfo(errorCode, msg);
return response;
}
}

0 comments on commit 1778a84

Please sign in to comment.