Skip to content

Commit

Permalink
[ISSUE apache#838] Removed redundant code (apache#842)
Browse files Browse the repository at this point in the history
  • Loading branch information
RSTdefg authored Apr 22, 2022
1 parent 00de71a commit 522bbd3
Show file tree
Hide file tree
Showing 11 changed files with 0 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,33 +205,6 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();

//omsMsg.setTopic(replyTopic);
// topic
//omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION, replyTopic);
//if (!StringUtils.isBlank(sendMessageRequestBody.getTag())) {
// omsMsg.putUserHeaders("Tag", sendMessageRequestBody.getTag());
//}
//rocketMQMsg = new Message(replyTopic,
// replyMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
//omsMsg.putUserProperties("msgType", "persistent");
//rocketMQMsg.putUserProperty(DeFiBusConstant.KEY, DeFiBusConstant.PERSISTENT);
//for (Map.Entry<String, String> entry : extFields.entrySet()) {
// omsMsg.putUserProperties(entry.getKey(), entry.getValue());
//}
//
////for rocketmq support
//MessageAccessor.putProperty(rocketMQMsg, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
//MessageAccessor.putProperty(rocketMQMsg, MessageConst.PROPERTY_CORRELATION_ID,
// rocketMQMsg.getProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID));
//MessageAccessor.putProperty(rocketMQMsg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT,
// rocketMQMsg.getProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO));
// ttl
//omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_TIMEOUT,
// String.valueOf(EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS));
////MessageAccessor.putProperty(rocketMQMsg, DeFiBusConstant.PROPERTY_MESSAGE_TTL,
// String.valueOf(EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS));
//omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP,
// String.valueOf(System.currentTimeMillis()));
if (messageLogger.isDebugEnabled()) {
messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, replyTopic);
}
Expand Down Expand Up @@ -297,24 +270,6 @@ public void onException(OnExceptionContext context) {
endTime - startTime, replyMQCluster + "-" + EventMeshConstants.RR_REPLY_TOPIC,
origTopic, bizNo, uniqueId, context.getException());
}

//@Override
//public void onException(Throwable e) {
// HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
// replyMessageResponseHeader,
// SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_REPLY_MSG_ERR.getRetCode(),
// EventMeshRetCode.EVENTMESH_REPLY_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
// asyncContext.onComplete(err, handler);
// long endTime = System.currentTimeMillis();
// eventMeshHTTPServer.metrics.summaryMetrics.recordReplyMsgFailed();
// eventMeshHTTPServer.metrics.summaryMetrics.recordReplyMsgCost(endTime - startTime);
// messageLogger.error("message|eventMesh2mq|RSP|SYNC|reply2MQCost={}|topic={}|origTopic={}|bizSeqNo={}|uniqueId={}",
// endTime - startTime,
// replyMQCluster + "-" + EventMeshConstants.RR_REPLY_TOPIC,
// replyMessageRequestBody.getOrigTopic(),
// replyMessageRequestBody.getBizSeqNo(),
// replyMessageRequestBody.getUniqueId(), e);
//}
});
} catch (Exception ex) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress());

SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) asyncContext.getRequest().getHeader();
//SendMessageRequestBody sendMessageRequestBody = (SendMessageRequestBody) asyncContext.getRequest().getBody();

SendMessageResponseHeader sendMessageResponseHeader =
SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
Expand Down Expand Up @@ -210,31 +209,11 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
}

try {
// body
//omsMsg.setBody(sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
//// topic
//omsMsg.setTopic(sendMessageRequestBody.getTopic());
//omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION, sendMessageRequestBody.getTopic());
//
//if (!StringUtils.isBlank(sendMessageRequestBody.getTag())) {
// omsMsg.putUserProperties(EventMeshConstants.TAG, sendMessageRequestBody.getTag());
//}
// ttl
//omsMsg.putUserProperties(Constants.PROPERTY_MESSAGE_TIMEOUT, ttl);
//// bizNo
//omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS, sendMessageRequestBody.getBizSeqNo());
event = CloudEventBuilder.from(event)
.withExtension("msgtype", "persistent")
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
//omsMsg.putUserProperties("msgType", "persistent");
//omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
//omsMsg.putUserProperties(Constants.RMB_UNIQ_ID, sendMessageRequestBody.getUniqueId());
//omsMsg.putUserProperties(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()));

// new rocketmq client can't support put DeFiBusConstant.PROPERTY_MESSAGE_TTL
//rocketMQMsg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_TTL, ttl);

if (messageLogger.isDebugEnabled()) {
messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,6 @@ public synchronized void initClientGroupBroadcastConsumer() throws Exception {
.withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
eventMeshTCPConfiguration.eventMeshServerIp).build();
String topic = event.getSubject();
// message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
//message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
// String.valueOf(System.currentTimeMillis()));
//message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
// eventMeshTCPConfiguration.eventMeshServerIp);

EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
(EventMeshAsyncConsumeContext) context;
Expand Down Expand Up @@ -692,7 +687,6 @@ private String pushMsgToEventMesh(CloudEvent msg, String ip, int port) throws Ex
3000);
} catch (Exception e) {
logger.error("httpPost " + targetUrl + " is fail,", e);
//throw new RuntimeException("httpPost " + targetUrl + " is fail," , e);
throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,4 @@ public boolean isAvailable(String topic) {
return true;
}

//@Override
//public int hashCode() {
// int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0)
// + (sessionState != null ? sessionState.hashCode() : 0);
// return code;
//}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ public MQConsumerWrapper getConsumer() {
public void ackMsg() {
if (consumer != null && context != null && events != null) {
consumer.updateOffset(events, context);
//ConsumeMessageService consumeMessageService = consumer..getDefaultMQPushConsumerImpl().getConsumeMessageService();
//((ConsumeMessageConcurrentlyService)consumeMessageService).updateOffset(msgs, context);
logger.info("ackMsg topic:{}, bizSeq:{}", events.get(0).getSubject(), EventMeshUtil.getMessageBizSeq(events.get(0)));
} else {
logger.warn("ackMsg failed,consumer is null:{}, context is null:{} , msgs is null:{}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public DownStreamMsgContext(CloudEvent event, Session session, MQConsumerWrapper
this.createTime = System.currentTimeMillis();
this.subscriptionItem = subscriptionItem;
String ttlStr = (String) event.getExtension("TTL");
//String ttlStr = msgExt.getUserProperties("TTL");
long ttl = StringUtils.isNumeric(ttlStr) ? Long.parseLong(ttlStr) :
EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
this.expireTime = System.currentTimeMillis() + ttl;
Expand All @@ -86,9 +85,6 @@ public void ackMsg() {
List<CloudEvent> events = new ArrayList<CloudEvent>();
events.add(event);
consumer.updateOffset(events, consumeConcurrentlyContext);
//ConsumeMessageService consumeMessageService =
// consumer.getDefaultMQPushConsumer().getDefaultMQPushConsumerImpl().getConsumeMessageService();
//((ConsumeMessageConcurrentlyService)consumeMessageService).updateOffset(msgs, consumeConcurrentlyContext);
logger.info("ackMsg seq:{}, topic:{}, bizSeq:{}", seq, events.get(0).getSubject(),
events.get(0).getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,31 +93,18 @@ public EventMeshTcpSendResult send(Header header, CloudEvent event, SendCallback
ttl = Long.parseLong((String) Objects.requireNonNull(
event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_TTL)));
}
//long ttl = msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL) != null ?
// Long.parseLong(msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL))
// : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
upStreamMsgContext = new UpStreamMsgContext(session, event, header, startTime, taskExecuteTime);
session.getClientGroupWrapper().get().request(upStreamMsgContext, initSyncRRCallback(header,
startTime, taskExecuteTime), ttl);
upstreamBuff.release();
} else if (Command.RESPONSE_TO_SERVER == cmd) {
String cluster = (String) event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER);
//String cluster = msg.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER);
if (!StringUtils.isEmpty(cluster)) {
String replyTopic = EventMeshConstants.RR_REPLY_TOPIC;
replyTopic = cluster + "-" + replyTopic;
event = CloudEventBuilder.from(event).withSubject(replyTopic).build();
//msg.getSystemProperties().put(Constants.PROPERTY_MESSAGE_DESTINATION, replyTopic);
//event(replyTopic);
}

////for rocketmq support
//MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
//MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID,
// msg.getProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID));
//MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT,
// msg.getProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO));

upStreamMsgContext = new UpStreamMsgContext(session, event, header, startTime, taskExecuteTime);
session.getClientGroupWrapper().get().reply(upStreamMsgContext);
upstreamBuff.release();
Expand Down Expand Up @@ -148,12 +135,6 @@ private RequestReplyCallback initSyncRRCallback(Header header, long startTime, l
public void onSuccess(CloudEvent event) {
String seq = header.getSeq();
// TODO: How to assign values here
//if (msg instanceof MessageExt) {
// msg.putUserProperty(EventMeshConstants.BORN_TIMESTAMP, String.valueOf(((MessageExt) msg)
// .getBornTimestamp()));
// msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP, String.valueOf(((MessageExt) msg)
// .getStoreTimestamp()));
//}
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.RSP_RECEIVE_EVENTMESH_IP, session.getEventMeshTCPConfiguration().eventMeshServerIp)
Expand All @@ -176,17 +157,14 @@ public void onSuccess(CloudEvent event) {

Package pkg = new Package();

//msg.getSystemProperties().put(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
try {
//pkg.setBody(EventMeshUtil.encodeMessage(msg));
pkg = (Package) protocolAdaptor.fromCloudEvent(event);
pkg.setHeader(new Header(cmd, OPStatus.SUCCESS.getCode(), null, seq));
pkg.getHeader().putProperty(Constants.PROTOCOL_TYPE, protocolType);
} catch (Exception e) {
pkg.setHeader(new Header(cmd, OPStatus.FAIL.getCode(), null, seq));
} finally {
Utils.writeAndFlush(pkg, startTime, taskExecuteTime, session.getContext(), session);
//session.write2Client(pkg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,6 @@ private void validateUserAgent(UserAgent user) throws Exception {
throw new Exception("client version cannot be null");
}

//if (user.getUsername() == null) {
// throw new Exception("client EventMeshUser cannot be null");
//}
//
//if (user.getPassword() == null) {
// throw new Exception("client EventMeshPasswd cannot be null");
//}

if (!(StringUtils.equals(EventMeshConstants.PURPOSE_PUB, user.getPurpose()) || StringUtils.equals(
EventMeshConstants.PURPOSE_SUB, user.getPurpose()))) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ public void run() {
Integer status = OPStatus.FAIL.getCode();
header = new Header(LISTEN_RESPONSE, status, e.toString(), pkg.getHeader().getSeq());
} finally {
//res.setHeader(header);
//writeAndFlush(res, startTime, session.getContext(), session);
//session.write2Client(res);

//check to avoid send repeatedly
session.trySendListenResponse(header, startTime, taskExecuteTime);
}
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 522bbd3

Please sign in to comment.