Skip to content

Commit

Permalink
!27 解决nacos引入出现的问题,完善日志打印
Browse files Browse the repository at this point in the history
Merge pull request !27 from 小宇宙/master
  • Loading branch information
Java3y authored and gitee-org committed Aug 5, 2022
2 parents bb314d7 + 352f581 commit b3415f9
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package com.java3y.austin.handler.receiver.kafka;

import com.alibaba.fastjson.JSON;
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -14,8 +12,6 @@
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
Expand All @@ -40,6 +36,9 @@ public class ReceiverStart {
@Autowired
private ConsumerFactory consumerFactory;

@Value("${austin.nacos.enabled}")
private Boolean nacosEnabled;

/**
* receiver的消费方法常量
*/
Expand All @@ -60,7 +59,13 @@ public class ReceiverStart {
*/
@PostConstruct
public void init() {
for (int i = 0; i < groupIds.size(); i++) {
int total = groupIds.size();
if (nacosEnabled) {
// 当nacos开启时 会导致Receiver提前加载 所以这里getBean次数-1
// nacos issue: https://github.com/nacos-group/nacos-spring-project/issues/249
total -= 1;
}
for (int i = 0; i < total; i++) {
context.getBean(Receiver.class);
}
}
Expand All @@ -74,8 +79,7 @@ public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupI
if (element instanceof Method) {
String name = ((Method) element).getDeclaringClass().getSimpleName() + "." + ((Method) element).getName();
if (RECEIVER_METHOD_NAME.equals(name)) {
attrs.put("groupId", groupIds.get(index));
index++;
attrs.put("groupId", groupIds.get(index++));
}
}
return attrs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class SendMqAction implements BusinessProcess<SendTaskModel> {
@Value("${austin.business.tagId.value}")
private String tagId;

@Value("${austin.mq.pipeline}")
private String mqPipeline;


@Override
public void process(ProcessContext<SendTaskModel> context) {
Expand All @@ -50,7 +53,7 @@ public void process(ProcessContext<SendTaskModel> context) {
}
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e)
log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e)
, JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));
}
}
Expand Down

0 comments on commit b3415f9

Please sign in to comment.