Skip to content

Commit

Permalink
✨ kakfa consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie committed Sep 4, 2017
1 parent 7843ed1 commit 1448842
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 0 deletions.
31 changes: 31 additions & 0 deletions SSM-WEB/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,37 @@
<scope>test</scope>
</dependency>

<!--kafka消费-->
<!--https://github.com/linzhaoming/easyframe-msg-->
<dependency>
<groupId>com.easyfun.easyframe</groupId>
<artifactId>easyframe-msg</artifactId>
<version>0.0.1</version>
</dependency>

<!--kafka消费所依赖的zk-->
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>

<dependency>
<groupId>com.crossoverJie</groupId>
<artifactId>SSM-API</artifactId>
Expand Down
131 changes: 131 additions & 0 deletions SSM-WEB/src/main/java/com/crossoverJie/kafka/KafkaMsgConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.crossoverJie.kafka;

import com.alibaba.fastjson.JSON;
import com.crossoverJie.util.DateUtil;
import com.easyfun.easyframe.msg.MsgIterator;
import com.easyfun.easyframe.msg.MsgUtil;
import com.easyfun.easyframe.msg.config.EasyMsgConfig;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.crossoverJie.util.PreconditionUtil.checkArguments;


/**
* kafka消费
*
* @author crossoverJie
* @date 2017年6月19日 下午3:15:16
*/
public class KafkaMsgConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMsgConsumer.class);

private static final int CORE_POOL_SIZE = 4;
private static final int MAXIMUM_POOL_SIZE = 4;
private static final int BLOCKING_QUEUE_CAPACITY = 4000;
private static final String KAFKA_CONFIG = "kafkaConfig";
private static final ExecutorService fixedThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(BLOCKING_QUEUE_CAPACITY));

//最后更新时间
private static AtomicLong LAST_MESSAGE_TIME = new AtomicLong(DateUtil.getLongTime());

private static MsgIterator iter = null;
private static String topic;//主题名称


static {
Properties properties = new Properties();
String path = System.getProperty(KAFKA_CONFIG);
checkArguments(!StringUtils.isBlank(path), "启动参数中没有配置kafka_easyframe_msg参数来指定kafka启动参数,请使用-DkafkaConfig=/path/fileName/easyframe-msg.properties");
try {
properties.load(new FileInputStream(new File(path)));
} catch (IOException e) {
LOGGER.error("IOException" ,e);
}
EasyMsgConfig.setProperties(properties);

}

private static void iteratorTopic() {
if (iter == null) {
iter = MsgUtil.consume(topic);
}
long i = 0L;
while (iter.hasNext()) {
i++;
if (i % 10000 == 0) {
LOGGER.info("consume i:" + i);
}
try {
String message = iter.next();
if (StringUtils.isEmpty(message)) {
continue;
}
LAST_MESSAGE_TIME = new AtomicLong(DateUtil.getLongTime());

//处理消息
LOGGER.debug("msg = " + JSON.toJSONString(message));
} catch (Exception e) {
LOGGER.error("KafkaMsgConsumer err:", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
LOGGER.error("Thread InterruptedException", e1);
}
break;
}
}
}

public static void main(String[] args) {
topic = System.getProperty("topic");
checkArguments(!StringUtils.isBlank(topic), "system property topic or log_path is must!");
while (true) {
try {
iteratorTopic();
} catch (Exception e) {
MsgUtil.shutdownConsummer();
iter = null;

LOGGER.error("KafkaMsgConsumer err:", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
LOGGER.error("Thread InterruptedException", e1);
}
} finally {
//此逻辑主要是解决kafka消费程序无故取不到消息,这是临时的解决办法
//此处关闭之后,由crontab每分钟检查一次,挂掉的话会重新拉起来
if (DateUtil.getLongTime() - LAST_MESSAGE_TIME.get() > 10 * 60) { //10分钟
fixedThreadPool.shutdown();
LOGGER.info("线程池是否关闭:" + fixedThreadPool.isShutdown());
try {
//当前线程阻塞10ms后,去检测线程池是否终止,终止则返回true
while (!fixedThreadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
LOGGER.info("检测线程池是否终止:" + fixedThreadPool.isTerminated());
}
} catch (InterruptedException e) {
LOGGER.error("等待线程池关闭错误", e);
}
LOGGER.info("线程池是否终止:" + fixedThreadPool.isTerminated());
LOGGER.info("in 10 min dont have data break");
break;
}
}
}
LOGGER.info("app shutdown");
System.exit(0);
}

}
4 changes: 4 additions & 0 deletions SSM-WEB/src/main/java/com/crossoverJie/util/DateUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public static String getCurrentDateStr()throws Exception{
SimpleDateFormat sdf=new SimpleDateFormat("yyyyMMddhhmmss");
return sdf.format(date);
}

public static long getLongTime(){
return System.currentTimeMillis() / 1000;
}
}
44 changes: 44 additions & 0 deletions SSM-WEB/src/main/java/com/crossoverJie/util/PreconditionUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.crossoverJie.util;

import java.util.Collection;
import java.util.Map;

/**
* 参数校验工具类
*/
public class PreconditionUtil {
private static final String PRI_KEY_ERR = "主键错误";

private PreconditionUtil() {
}

public static void intPriKeyChk(Integer pk) {
checkArguments(!isNull(pk) && pk > 0, PRI_KEY_ERR);
}

public static void checkArguments(boolean expression, String errMsg) {
if (!expression) {
throw new IllegalArgumentException(errMsg);
}
}

public static boolean isNull(Object o) {
return null == o;
}

public static <T> boolean colIsEmpty(Collection<T> col) {
return isNull(col) || col.isEmpty();
}

public static <T> boolean arrayIsEmpty(T[] ts) {
return ts == null || ts.length == 0;
}

public static boolean mapIsEmpty(Map map) {
return map == null || map.size() == 0;
}

public static void main(String[] args) {
int[] a = {};
}
}
33 changes: 33 additions & 0 deletions SSM-WEB/src/main/java/com/crossoverJie/util/PropertyUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.crossoverJie.util;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* 加载和读取配置文件的工具类
*/
public class PropertyUtil extends PropertyPlaceholderConfigurer {

public static Map<String, String> ctxPropertiesMap;

@Override
protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props) throws BeansException {
super.processProperties(beanFactoryToProcess, props);
ctxPropertiesMap = new HashMap<String, String>();
for (Object key : props.keySet()) {
String keyStr = key.toString();
String value = props.getProperty(keyStr);
ctxPropertiesMap.put(keyStr, value);
}
}

public String getProperty(String name) {
return ctxPropertiesMap.get(name);
}

}
95 changes: 95 additions & 0 deletions SSM-WEB/src/main/java/com/crossoverJie/util/PropertyUtilExt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.crossoverJie.util;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
import java.util.Properties;

/**
* 读取配置文件工具类
*/
public class PropertyUtilExt {

private final static Logger logger = LoggerFactory.getLogger(PropertyUtilExt.class);

/**
* 读取配置文件中的value值
* @param fileName 配置文件名
* @param key 需要获取的key
* @return
*/
public static String getProperty(String fileName, String key){
String value = null;
String rootPath = null;
Properties properties = new Properties();
try{
//获取根路径
rootPath = PropertyUtil.class.getClassLoader().getResource("").toURI().getPath();
// rootPath = ClassUtils.getDefaultClassLoader().getResource("").getPath();
//拼成配置文件目录
String path = rootPath+"/"+fileName;
//读取配置文件
FileInputStream fileInputStream = new FileInputStream(new File(path));
properties.load(fileInputStream);
value = properties.getProperty(key);
}catch(Exception e){
logger.error("读取配置文件失败!", e);
}
return value;
}

/**
* 获取web.peoperties中的值
* @param key
* @return
*/
public static String getProperty(String key){
String filename = "web.properties";
return getProperty(filename, key);
}

/**
* 获取项目根目录
* @return
* @throws Exception
*/
public static String getRootPath() throws Exception {
return PropertyUtil.class.getClassLoader().getResource("").toURI().getPath();
}

/**
* 获取项目外配置文件内容
* @param propertyName 配置文件启动参数名
* @param keyName 文件内key
* @return
*/
public static String getExtProperty(String propertyName, String keyName){
String value = "";
String propPath = System.getProperty(propertyName);
if(StringUtils.isEmpty(propPath)){
throw new NullPointerException("配置文件的地址不能为空,请在启动参数中指定");
}

try {
InputStream inputestream = null;
if(propPath.startsWith("file:")){
URI uri = new URI(propPath);
inputestream = new FileInputStream(new File(uri));
}else{
inputestream = new FileInputStream(new File(propPath));
}
Properties properties = new Properties();
properties.load(inputestream);
value = properties.getProperty(keyName);
} catch (Exception e) {
logger.error(String.format("加载配置文件【%s】失败。", propPath), e);
System.exit(0);
}
return value;
}
}
22 changes: 22 additions & 0 deletions SSM-WEB/src/main/resources/easyframe-msg.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#全局配置
global.topic.default.name=ai-topic

#### producer的配置 ########
producer.serializer.class=kafka.serializer.StringEncoder
#生产配置集群
producer.metadata.broker.list=10.20.14.51:9094
producer.producer.type=sync
producer.serializer.encoding=UTF-8

#Consumer的配置, chroot为kafka, [注意]需要与Broker的ZooKeeper配置一致
#生产配置集群
consumer.zookeeper.connect=10.20.14.51:2181
consumer.group.id=group1
consumer.zookeeper.session.timeout.ms=1000000
consumer.zookeeper.sync.time.ms=2000
consumer.auto.commit.interval.ms=3000
consumer.auto.offset.reset=smallest
#consumer.auto.offset.reset=largest
consumer.auto.commit.enable=true
consumer.consumer.timeout.ms=1000000
consumer.serializer.encoding=UTF-8

0 comments on commit 1448842

Please sign in to comment.