Skip to content

Commit

Permalink
1. flink sink 不再做batch(pipeline 原本就会多维度写入)
Browse files Browse the repository at this point in the history
2. RedisUtils 增加hGetAll 和lRange方法
3. 接口调试
  • Loading branch information
ZhongFuCheng3y committed Feb 23, 2022
1 parent ba14aee commit 91c1e21
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

/**
* 调度类型
*
* @author 3y
*/
public enum ScheduleTypeEnum {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

/**
* 简单的埋点信息
*
* @author 3y
*/
@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,8 @@
@Slf4j
public class AustinSink implements SinkFunction<AnchorInfo> {

/**
* batch的超时时间和大小
*/
private static final Integer BATCH_SIZE = 10;
private static final Long TIME_OUT = 2000L;

/**
* 用ThreadLocal来暂存数据做批量处理
*/
private static ThreadLocal<List<AnchorInfo>> baseAnchors = ThreadLocal.withInitial(() -> new ArrayList<>(BATCH_SIZE));
private static ThreadLocal<Long> lastClearTime = ThreadLocal.withInitial(() -> new Long(System.currentTimeMillis()));

@Override
public void invoke(AnchorInfo anchorInfo) throws Exception {
public void invoke(AnchorInfo anchorInfo, Context context) throws Exception {
realTimeData(anchorInfo);
offlineDate(anchorInfo);
}
Expand All @@ -46,43 +34,34 @@ public void invoke(AnchorInfo anchorInfo) throws Exception {
* 1.用户维度(查看用户当天收到消息的链路详情),数量级大,只保留当天
* 2.消息模板维度(查看消息模板整体下发情况),数量级小,保留30天
*
* @param anchorInfo
* @param info
*/
private void realTimeData(AnchorInfo anchorInfo) {
baseAnchors.get().add(anchorInfo);
if (baseAnchors.get().size() >= BATCH_SIZE || System.currentTimeMillis() - lastClearTime.get() >= TIME_OUT) {

try {
LettuceRedisUtils.pipeline(redisAsyncCommands -> {
List<RedisFuture<?>> redisFutures = new ArrayList<>();
for (AnchorInfo info : baseAnchors.get()) {
/**
* 1.构建userId维度的链路信息 数据结构list:{key,list}
* key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
*/
SimpleAnchorInfo simpleAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getTimestamp()).build();
for (String id : info.getIds()) {
redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(simpleAnchorInfo).getBytes()));
redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000));
}
private void realTimeData(AnchorInfo info) {
try {
LettuceRedisUtils.pipeline(redisAsyncCommands -> {
List<RedisFuture<?>> redisFutures = new ArrayList<>();
/**
* 1.构建userId维度的链路信息 数据结构list:{key,list}
* key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
*/
SimpleAnchorInfo simpleAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getTimestamp()).build();
for (String id : info.getIds()) {
redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(simpleAnchorInfo).getBytes()));
redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000));
}

/**
* 2.构建消息模板维度的链路信息 数据结构hash:{key,hash}
* key:businessId,hashValue:{state,stateCount}
*/
redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(),
String.valueOf(info.getState()).getBytes(), info.getIds().size()));
redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(), DateUtil.offsetDay(new Date(), 30).getTime()));
}
return redisFutures;
});
/**
* 2.构建消息模板维度的链路信息 数据结构hash:{key,hash}
* key:businessId,hashValue:{state,stateCount}
*/
redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(),
String.valueOf(info.getState()).getBytes(), info.getIds().size()));
redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(), DateUtil.offsetDay(new Date(), 30).getTime()));
return redisFutures;
});

} catch (Exception e) {
log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e));
} finally {
lastClearTime.set(System.currentTimeMillis());
baseAnchors.get().clear();
}
} catch (Exception e) {
log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,40 @@ public Map<String, String> mGet(List<String> keys) {
}
}
} catch (Exception e) {
log.error("redis mGet fail! e:{}", Throwables.getStackTraceAsString(e));
log.error("RedisUtils#mGet fail! e:{}", Throwables.getStackTraceAsString(e));
}
return result;
}

/**
* hGetAll
*
* @param key
*/
public Map<Object, Object> hGetAll(String key) {
try {
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
return entries;
} catch (Exception e) {
log.error("RedisUtils#hGetAll fail! e:{}", Throwables.getStackTraceAsString(e));
}
return null;
}

/**
* lRange
*
* @param key
*/
public List<String> lRange(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
log.error("RedisUtils#lRange fail! e:{}", Throwables.getStackTraceAsString(e));
}
return null;
}

/**
* pipeline 设置 key-value 并设置过期时间
*/
Expand All @@ -57,7 +86,7 @@ public void pipelineSetEx(Map<String, String> keyValues, Long seconds) {
return null;
});
} catch (Exception e) {
log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e));
log.error("RedisUtils#pipelineSetEx fail! e:{}", Throwables.getStackTraceAsString(e));
}
}

Expand All @@ -67,7 +96,7 @@ public void pipelineSetEx(Map<String, String> keyValues, Long seconds) {
* @param seconds 过期时间
* @param delta 自增的步长
*/
public void pipelineHashIncrByEX(Map<String, String> keyValues, Long seconds, Long delta) {
public void pipelineHashIncrByEx(Map<String, String> keyValues, Long seconds, Long delta) {
try {
redisTemplate.executePipelined((RedisCallback<String>) connection -> {
for (Map.Entry<String, String> entry : keyValues.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,40 @@
package com.java3y.austin.web.controller;

import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.vo.BasicResultVO;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.utils.RedisUtils;
import com.java3y.austin.web.vo.DataParam;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Map;

/**
* 获取数据接口(全链路追踪)
*
* @author 3y
*/
@Slf4j
@RestController
@RequestMapping("/messageTemplate")
@RequestMapping("/trace")
@Api("获取数据接口(全链路追踪)")
@CrossOrigin(origins = "http://localhost:3000", allowCredentials = "true", allowedHeaders = "*")
public class DataController {

/**
* 如果Id存在,则修改
* 如果Id不存在,则保存
*/
@Autowired
private RedisUtils redisUtils;

@PostMapping("/data")
@ApiOperation("/获取数据")
public BasicResultVO getData(@RequestBody MessageTemplate messageTemplate) {
public BasicResultVO getData(@RequestBody DataParam dataParam) {


Long businessId = dataParam.getBusinessId();
Map<Object, Object> objectObjectMap = redisUtils.hGetAll(String.valueOf(businessId));
log.info("data:{}", JSON.toJSONString(objectObjectMap));
return BasicResultVO.success();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.java3y.austin.web.service;

/**
* 数据链路追踪获取接口
* @author 3y
*/
public interface DataService {


}

0 comments on commit 91c1e21

Please sign in to comment.