Skip to content

Commit

Permalink
1. austin.sql 测试数据修改
Browse files Browse the repository at this point in the history
2. 安装教程 fix
3. 全链路日志写到austinLog topic
4. 修整 austin-stream 的代码
  • Loading branch information
ZhongFuCheng3y committed Feb 16, 2022
1 parent 4871a8f commit d2e4914
Show file tree
Hide file tree
Showing 17 changed files with 153 additions and 111 deletions.
12 changes: 6 additions & 6 deletions INSTALL.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ docker ps
docker exec -it kafka sh
```

创建一个topic(这里我的**topicName**就叫austinTopic,你们可以改成自己的)
创建一个topic(这里我的**topicName**就叫austinBusiness,你们可以改成自己的)

```
$KAFKA_HOME/bin/kafka-topics.sh --create --topic austinTopic --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1
$KAFKA_HOME/bin/kafka-topics.sh --create --topic austinBusiness --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1
```

查看刚创建的topic信息:
Expand Down Expand Up @@ -344,16 +344,16 @@ global:
scrape_configs:
- job_name: 'prometheus'
  static_configs:
  - targets: ['ip:9090'] // TODO ip自己写
  - targets: ['ip:9090']
- job_name: 'cadvisor'
  static_configs:
  - targets: ['ip:8899'] // TODO ip自己写
  - targets: ['ip:8899']
- job_name: 'node'
  static_configs:
  - targets: ['ip:9100'] // TODO ip自己写
  - targets: ['ip:9100']
```

**这里要注意端口,按自己配置的来**
**这里要注意端口,按自己配置的来,ip也要填写为自己的**

把这份`prometheus.yml`的配置往`/etc/prometheus/prometheus.yml` 路径下**复制**一份。随后在目录下`docker-compose up -d`启动,于是我们就可以分别访问:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ private void init() {

@Autowired
private RedisUtils redisUtils;
@Autowired
private LogUtils logUtils;


@Override
public void deduplication(DeduplicationParam param) {
Expand Down Expand Up @@ -62,7 +65,7 @@ public void deduplication(DeduplicationParam param) {
// 剔除符合去重条件的用户
if (CollUtil.isNotEmpty(filterReceiver)) {
taskInfo.getReceiver().removeAll(filterReceiver);
LogUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(filterReceiver).state(param.getAnchorState().getCode()).build());
logUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(filterReceiver).state(param.getAnchorState().getCode()).build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.support.utils.LogUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
Expand All @@ -22,6 +23,10 @@ public class DiscardMessageService {
@ApolloConfig("boss.austin")
private Config config;

@Autowired
private LogUtils logUtils;


/**
* 丢弃消息,配置在apollo
* @param taskInfo
Expand All @@ -33,7 +38,7 @@ public boolean isDiscard(TaskInfo taskInfo) {
AustinConstant.APOLLO_DEFAULT_VALUE_JSON_ARRAY));

if (array.contains(String.valueOf(taskInfo.getMessageTemplateId()))) {
LogUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).state(AnchorState.DISCARD.getCode()).build());
logUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).state(AnchorState.DISCARD.getCode()).build());
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public abstract class BaseHandler implements Handler {

@Autowired
private HandlerHolder handlerHolder;
@Autowired
private LogUtils logUtils;


/**
* 初始化渠道与Handler的映射关系
Expand All @@ -35,10 +38,10 @@ private void init() {
@Override
public void doHandler(TaskInfo taskInfo) {
if (handler(taskInfo)) {
LogUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
return;
}
LogUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class Receiver {
@Autowired
private TaskPendingHolder taskPendingHolder;

@Autowired
private LogUtils logUtils;

@KafkaListener(topics = "#{'${austin.business.topic.name}'}")
public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
Expand All @@ -51,7 +54,7 @@ public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHead
*/
if (topicGroupId.equals(messageGroupId)) {
for (TaskInfo taskInfo : taskInfoLists) {
LogUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
taskPendingHolder.route(topicGroupId).execute(task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessContext;
import com.java3y.austin.support.utils.KafkaUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;

/**
* @author 3y
Expand All @@ -22,22 +22,22 @@
public class SendMqAction implements BusinessProcess {

@Autowired
private KafkaTemplate kafkaTemplate;
private KafkaUtils kafkaUtils;

@Value("${austin.business.topic.name}")
private String topicName;

@Override
public void process(ProcessContext context) {
SendTaskModel sendTaskModel = (SendTaskModel) context.getProcessModel();
String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});

try {
kafkaTemplate.send(topicName, JSON.toJSONString(sendTaskModel.getTaskInfo(),
new SerializerFeature[] {SerializerFeature.WriteClassName}));
kafkaUtils.send(topicName, message);
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e)
, JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));

}
}
}
6 changes: 0 additions & 6 deletions austin-stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>


<dependency>
<groupId>com.java3y.austin</groupId>
<artifactId>austin-support</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.java3y.austin.stream;

import com.java3y.austin.stream.constants.AustinFlinkConstant;
import com.java3y.austin.stream.utils.FlinkUtils;
import com.java3y.austin.stream.utils.SpringContextUtils;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -8,7 +9,6 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.springframework.context.ApplicationContext;

/**
* flink启动类
Expand All @@ -20,29 +20,28 @@ public class AustinBootStrap {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SpringContextUtils.loadContext(AustinFlinkConstant.SPRING_CONFIG_PATH);

String topicName = "austinTopicV2";
String groupId = "austinTopicV23";
ApplicationContext applicationContext = SpringContextUtils.loadContext("classpath*:austin-spring.xml");
FlinkUtils flinkUtils = applicationContext.getBean(FlinkUtils.class);
KafkaSource<String> kafkaConsumer = flinkUtils.getKafkaConsumer(topicName, groupId);
/**
* 1.获取KafkaConsumer
*/
KafkaSource<String> kafkaConsumer = SpringContextUtils.getBean(FlinkUtils.class).getKafkaConsumer(AustinFlinkConstant.TOPIC_NAME, AustinFlinkConstant.GROUP_ID, AustinFlinkConstant.BROKER);
DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), AustinFlinkConstant.SOURCE_NAME);

DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkaSource");

/**
* 2. 数据转换处理
*/

/**
* 3. 将实时数据多维度写入Redis(已实现),离线数据写入hive(未实现)
*/
kafkaSource.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
log.error("kafka value:{}", value);
}
});



// DataStream<AnchorInfo> stream = envBatchPendingThread
// .addSource(new AustinSource())
// .name("transactions");
//
// stream.addSink(new AustinSink());

env.execute("AustinBootStrap");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.java3y.austin.stream.constants;

public class AustinFlinkConstant {

/**
* Kafka 配置信息
* TODO 使用前需要把broker配置
*/
public static final String GROUP_ID = "austinLogGroup";
public static final String TOPIC_NAME = "austinLog";
public static final String BROKER = "ip:port";


/**
* spring配置文件路径
*/
public static final String SPRING_CONFIG_PATH = "classpath*:austin-spring.xml";


/**
* Flink流程常量
*/
public static final String SOURCE_NAME = "austin_kafka_source";
public static final String FUNCTION_NAME = "austin_transfer";
public static final String SINK_NAME = "austin_sink";

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
* mock
*/
@Slf4j
public class AustinSink extends RichSinkFunction<AnchorInfo> {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.java3y.austin.stream.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
Expand All @@ -9,17 +10,18 @@
*
* @author 3y
*/
@Slf4j
public class FlinkUtils {

/**
* 获取kafkaConsumer
*
* @param topicName
* @param groupId
* @return
*/
public KafkaSource<String> getKafkaConsumer(String topicName, String groupId) {
public KafkaSource<String> getKafkaConsumer(String topicName, String groupId, String broker) {
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("ip:port")
.setBootstrapServers(broker)
.setTopics(topicName)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.java3y.austin.stream.utils;

import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
Expand All @@ -13,32 +14,35 @@
* 获取SpringContext对象
*/
@Slf4j
public class SpringContextUtils {
public class SpringContextUtils {
private static ApplicationContext context;


/**
* XML配置
*/
private static List<String> xmlPath = new ArrayList<>();


public static ApplicationContext loadContext(String path) {
return loadContext(new String[]{path});
}

/**
* 通过spring.xml文件配置将信息 装载 context
*
* @param paths
* @return
*/
public static synchronized ApplicationContext loadContext(String[] paths) {
if (null != paths && paths.length > 0) {
//筛选新增
List<String> newPaths = new ArrayList<>();
for (String path : paths) {
if (!xmlPath.contains(path)) {
log.info("ApplicationContextFactory add new path {}", path);
newPaths.add(path);
} else {
log.info("ApplicationContextFactory already load path {}", path);
}
}
if (!newPaths.isEmpty()) {
if (CollUtil.isNotEmpty(newPaths)) {
String[] array = new String[newPaths.size()];
for (int i=0; i<newPaths.size(); i++) {
for (int i = 0; i < newPaths.size(); i++) {
array[i] = newPaths.get(i);
xmlPath.add(newPaths.get(i));
}
Expand Down
Loading

0 comments on commit d2e4914

Please sign in to comment.