Skip to content

Commit

Permalink
JAVA KAFKA
Browse files Browse the repository at this point in the history
  • Loading branch information
OomelodyoO authored and OomelodyoO committed Oct 17, 2018
1 parent 33b0c97 commit 1568875
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 0 deletions.
17 changes: 17 additions & 0 deletions kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
id 'java'
}

version '0.0.1'

sourceCompatibility = 1.8

repositories {
mavenCentral()
}

dependencies {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0'
compile 'org.slf4j:slf4j-log4j12:1.7.25'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
47 changes: 47 additions & 0 deletions kafka/src/main/java/win/zhangzhixing/CKafkaConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package win.zhangzhixing;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;

public class CKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "192.168.102.47:9092");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(GROUP_ID_CONFIG, "zzx-group");
// earliest latest
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ZZX-TOPIC"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("onPartitionsRevoked");
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("onPartitionsAssigned");
}
});

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println();
}
}
}
}
37 changes: 37 additions & 0 deletions kafka/src/main/java/win/zhangzhixing/CKafkaProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package win.zhangzhixing;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

public class CKafkaProducer {
private final Producer<String,String> producer;
public final static String TOPIC = "ZZX-TOPIC";
public CKafkaProducer() {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG,"192.168.102.47:9092");
props.put(KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
public void produce(){
int messageNo = 0;
final int COUNT = 200;
while (messageNo < COUNT){
String key = String.valueOf(messageNo);
String data = "@@@@@hello kafka message"+key;
producer.send(new ProducerRecord<>(TOPIC,key,data));
System.out.println(data);
messageNo++;
}
}
public static void main(String[] args) {
new CKafkaProducer().produce();
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ include 'http-session'
include 'socket'
include 'nio'
include 'java-synchronized'
include 'kafka'

0 comments on commit 1568875

Please sign in to comment.