Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
sunney committed May 2, 2016
0 parents commit 72f9492
Show file tree
Hide file tree
Showing 11 changed files with 529 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.idea
.settings
.classpath
.project
target

88 changes: 88 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.3.RELEASE</version>
</parent>
<groupId>com.sunney</groupId>
<artifactId>kafka-demo</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>kafka-demo</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId> org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>
<build>
<finalName>kafak-demo</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>1.3.3.RELEASE</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<!-- do not enable it, this will creats a non standard jar and cause
autoconfig to fail -->
<executable>false</executable>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.6</version>
<configuration>
<delimiters>
<delimiter>@</delimiter>
</delimiters>
</configuration>
</plugin>
</plugins>
</build>
</project>
63 changes: 63 additions & 0 deletions src/main/java/com/sunney/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.sunney;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.alibaba.fastjson.JSON;
import com.sunney.service.KafkaService;
import com.sunney.service.UserDto;


/**
* Hello world!
*/
@SpringBootApplication
@EnableScheduling
@ComponentScan
@EnableAutoConfiguration
@ServletComponentScan
public class Application {

public static ApplicationContext applicationContext;

public static void main(String[] args) throws Exception {

SpringApplication app = new SpringApplication(Application.class);
app.setWebEnvironment(false);
Set<Object> set = new HashSet<Object>();
set.add("classpath:applicationContext.xml");
app.setSources(set);
applicationContext = app.run(args);
send();

}
public static void send(){
KafkaService kafkaService = Application.applicationContext.getBean("kafkaService", KafkaService.class);
UserDto user=null;
for (int i = 1; i < 100; i++) {
user=new UserDto();
user.setGmtCeate(new Date());
user.setUserId(i);
user.setUserName("sunney");
List<Long> list=new ArrayList<Long>();
list.add(10000l);
list.add(10001l);
list.add(10002l);
user.setUserList(list);
String msg= JSON.toJSONString(user);
kafkaService.sendUserInfo("sunneytopic",msg);
}
}

}
16 changes: 16 additions & 0 deletions src/main/java/com/sunney/service/KafkaService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 1999-2024 Colotnet.com All right reserved. This software is the
* confidential and proprietary information of Colotnet.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Colotnet.com.
*/
package com.sunney.service;

/**
* 类KafkaService.java的实现描述:TODO 类实现描述
* @author Sunney 2016年4月30日 上午11:30:53
*/
public interface KafkaService {
public void sendUserInfo(String topic, Object obj);
}
80 changes: 80 additions & 0 deletions src/main/java/com/sunney/service/UserDto.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 1999-2024 Colotnet.com All right reserved. This software is the
* confidential and proprietary information of Colotnet.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Colotnet.com.
*/
package com.sunney.service;

import java.util.Date;
import java.util.List;

/**
* 类UserDto.java的实现描述:TODO 类实现描述
* @author Sunney 2016年4月30日 上午10:53:54
*/
public class UserDto {
private Integer userId;
private String userName;
private List<Long> userList;
private Date gmtCeate;

/**
* @return the userId
*/
public Integer getUserId() {
return userId;
}

/**
* @param userId the userId to set
*/
public void setUserId(Integer userId) {
this.userId = userId;
}

/**
* @return the userName
*/
public String getUserName() {
return userName;
}

/**
* @param userName the userName to set
*/
public void setUserName(String userName) {
this.userName = userName;
}

/**
* @return the userList
*/
public List<Long> getUserList() {
return userList;
}

/**
* @param userList the userList to set
*/
public void setUserList(List<Long> userList) {
this.userList = userList;
}

/**
* @return the gmtCeate
*/
public Date getGmtCeate() {
return gmtCeate;
}

/**
* @param gmtCeate the gmtCeate to set
*/
public void setGmtCeate(Date gmtCeate) {
this.gmtCeate = gmtCeate;
}


}
50 changes: 50 additions & 0 deletions src/main/java/com/sunney/service/impl/KafkaConsumerService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 1999-2024 Colotnet.com All right reserved. This software is the confidential and proprietary information of
* Colotnet.com ("Confidential Information"). You shall not disclose such Confidential Information and shall use it only
* in accordance with the terms of the license agreement you entered into with Colotnet.com.
*/
package com.sunney.service.impl;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.sunney.service.UserDto;

/**
* 类KafkaConsumerService.java的实现描述:TODO 类实现描述
*
* @author Sunney 2016年4月30日 上午11:46:14
*/
public class KafkaConsumerService {

static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);

public void processMessage(Map<String, Map<Integer, String>> msgs) {
logger.info("===============================================processMessage===============");
for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
logger.info("============Topic:" + entry.getKey());
LinkedHashMap<Integer, String> messages = (LinkedHashMap<Integer, String>) entry.getValue();
Set<Integer> keys = messages.keySet();
for (Integer i : keys)
logger.info("======Partition:" + i);
Collection<String> values = messages.values();
for (Iterator<String> iterator = values.iterator(); iterator.hasNext();) {
String message = "["+iterator.next()+"]";
logger.info("=====message:" + message);
List<UserDto> userList = JSON.parseArray(message, UserDto.class);
logger.info("=====userList.size:" + userList.size());

}

}
}

}
36 changes: 36 additions & 0 deletions src/main/java/com/sunney/service/impl/KafkaServiceImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 1999-2024 Colotnet.com All right reserved. This software is the
* confidential and proprietary information of Colotnet.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Colotnet.com.
*/
package com.sunney.service.impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

import com.sunney.service.KafkaService;

/**
* 类KafkaServiceImpl.java的实现描述:TODO 类实现描述
* @author Sunney 2016年4月30日 上午11:31:13
*/
@Service("kafkaService")
public class KafkaServiceImpl implements KafkaService{

@Autowired
@Qualifier("kafkaTopicTest")
MessageChannel channel;

public void sendUserInfo(String topic, Object obj) {
channel.send(MessageBuilder.withPayload(obj)
.setHeader(KafkaHeaders.TOPIC,topic)
.build());
}

}
24 changes: 24 additions & 0 deletions src/main/resources/applicationContext.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.2.xsd"
default-autowire="byName">

<!-- 导入Spring配置文件 -->
<import resource="spring-kafka-consumer.xml" />
<import resource="spring-kafka-producer.xml" />

</beans>
Loading

0 comments on commit 72f9492

Please sign in to comment.