Skip to content

Commit

Permalink
more kafka code on streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dharabose committed May 7, 2018
1 parent c0a6560 commit 24d9ac6
Show file tree
Hide file tree
Showing 7 changed files with 492 additions and 52 deletions.
78 changes: 46 additions & 32 deletions conf/kafka-commands.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@


---------------------------------------------------------------------------------------------------------------------------------------------------------


KAFKA COMMANDS


---------------------------------------------------------------------------------------------------------------------------------------------------------


[1] echo $PATH

[2] export PATH=$PATH:/path/to/jdk/bin
Expand All @@ -19,12 +30,16 @@
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 30 --topic MONGODB-TOPIC
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 30 --topic ETMS-TOPIC

[9] ./kafka-console-producer.sh --broker-list localhost:9092 --topic CAPSULE-TOPIC
[9] ./kafka-console-producer.sh --broker-list localhost:9092 --topic HDFS-TOPIC
[OR]
./kafka-console-producer.sh --broker-list localhost:9092 --topic HDFS-TOPIC --property parse.key=true --property key.separator=:


[10] ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic DATA-CLEANSED-TOPIC --from-beginning
[10] ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MAPPED_HDFS_DATA_TOPIC --from-beginning
[OR]
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MAPPED_HDFS_DATA_TOPIC --property print.key=true --property key.separator=" --- " --from-beginning



--------------------------------- ADMINISTRATION ---------------------------------
[11] Data of kafka are by default inside /tmp/kafka-logs
Expand All @@ -40,44 +55,44 @@
Topic: KANDY-TOPIC Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: KANDY-TOPIC Partition: 2 Leader: 0 Replicas: 0 Isr: 0

./kafka-topics.sh --zookeeper localhost:2181 --delete --topic CHOLA
But this will have impact only when delete.topic.enable is set to true

Altering the partitions count for a topic
/kafka-topics.sh --zookeeper localhost:2181 --alter --topic ZETA-ENGINE-TOPIC --partitions 20
[14] Deleting a topic
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic CHOLA
But this will have impact only when delete.topic.enable is set to true

You cannot decrease the number of partitions
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic ETMS-TOPIC --partitions 5
[15] Altering a topic
Altering the partitions count for a topic
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic ZETA-ENGINE-TOPIC --partitions 20

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic ETMS-TOPIC currently has 31 partitions, 5 would not be an increase.
[2018-04-27 20:16:37,610] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic ETMS-TOPIC currently has 31 partitions, 5 would not be an increase.
You cannot decrease the number of partitions

./kafka-topics.sh --zookeeper localhost:2181 --alter --topic ETMS-TOPIC --partitions 5

Show all the consumer groups
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic ETMS-TOPIC currently has 31 partitions, 5 would not be an increase.
[2018-04-27 20:16:37,610] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic ETMS-TOPIC currently has 31 partitions, 5 would not be an increase.


[16] List all the consumer groups
Show all the consumer groups
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

[14] Reading a segment file
[17] Reading a segment file
./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/dharshekth-kafka/MESH_TOPIC-1/00000000000000000000.log
./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/kafka-logs/KANDY-TOPIC-23/00000000000000000000.timeindex
./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/kafka-logs/KANDY-TOPIC-23/00000000000000000000.index

[18] Describe about the consumer
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group AWS-CONSUMER4 --describe

Describe about the consumer
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group AWS-CONSUMER4 --describe

[16] kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1" --generate
[19] kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1" --generate

[17]
echo "exclude.internal.topics=false" > consumer.config
[20] echo "exclude.internal.topics=false" > consumer.config
/kafka-console-consumer.sh --consumer.config consumer.config --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server localhost:9092 --topic __consumer_offsets


--------------------------------- ADMINISTRATION ---------------------------------

[18]
[21]
---Servers---
listeners=PLAINTEXT://host.name:port,SSL://host.name:port

Expand All @@ -101,19 +116,18 @@ Describe about the consumer



[20]
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --operation ALL --topic BASHAS
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --operation WRITE --topic HULK
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --producer --topic HULKER
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --authorizer kafka.security.auth.SimpleAclAuthorizer --add --allow-principal User:* --producer --topic BASHAS
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --authorizer kafka.security.auth.SimpleAclAuthorizer --add --allow-principal User:* --consumer --topic BASHAS --group *
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --deny-principal User:* --operation ALL --topic DATAS
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic FARM-TOPIC
[22]
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --operation ALL --topic BASHAS
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --operation WRITE --topic HULK
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --producer --topic HULKER
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --authorizer kafka.security.auth.SimpleAclAuthorizer --add --allow-principal User:* --producer --topic BASHAS
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --authorizer kafka.security.auth.SimpleAclAuthorizer --add --allow-principal User:* --consumer --topic BASHAS --group *
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --deny-principal User:* --operation ALL --topic DATAS
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic FARM-TOPIC

[23] ./bin/kafka-run-class.sh org.apache.kafka.clients.producer.internals.ProducerMetrics > metrics.html

[21] ./bin/kafka-run-class.sh org.apache.kafka.clients.producer.internals.ProducerMetrics > metrics.html

10:02 min



6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
<version>3.2.1</version>
</dependency>

<dependency>
<groupId>com.lightbend</groupId>
<artifactId>kafka-streams-scala_2.12</artifactId>
<version>0.1.0</version>
</dependency>


</dependencies>
</project>
Expand Down
6 changes: 3 additions & 3 deletions src/com/dmac/KafkaUserCustomPartitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class KafkaUserCustomPartitioner extends Partitioner {

val key_ = key.asInstanceOf[String]
if (key_.startsWith("M"))
0
10
else if (key_.startsWith("C"))
6
16
else
9
19
}
}
180 changes: 180 additions & 0 deletions src/com/dmac/streams/HDFSDataProcessSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package com.dmac.streams;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;

/**
* Created by dharshekthvel on 4/28/18.
*/
public class HDFSDataProcessSupplier implements ProcessorSupplier<String, String> {

@Override
public Processor<String, String> get() {
return new HDFSDataProcessor();
}

}


class HDFSDataProcessor implements Processor<String, String> {


ProcessorContext processorContext = null;
private KeyValueStore<String, String> kvStore = null;

@Override
public void init(ProcessorContext _processorContext) {

this.processorContext = _processorContext;
//kvStore = (KeyValueStore) _processorContext.getStateStore("DATA_STORE");

// call the punctuate
this.processorContext.schedule(1000);

}

@Override
public void process(String key, String value) {

// Do complex processing and forward it to next topic
processorContext.forward(key.concat("__HDFS_DATA__"), value.concat("__HDFS_DATA__"));
//kvStore.put(key.toUpperCase(),value.toUpperCase());
}

@Override
public void punctuate(long l) {
processorContext.commit();
}

@Override
public void close() {

//kvStore.close();
}


}



class RuleEngineSupplier implements ProcessorSupplier<String, String> {
@Override
public Processor<String, String> get() {
return new RuleEngineProcessor();
}
}

class RuleEngineProcessor implements Processor<String, String> {


ProcessorContext processorContext = null;
private KeyValueStore<String, String> kvStore = null;

@Override
public void init(ProcessorContext _processorContext) {

this.processorContext = _processorContext;
//kvStore = (KeyValueStore) _processorContext.getStateStore("LENGTH_STORE");

// call the punctuate
this.processorContext.schedule(1000);

}

@Override
public void process(String key, String value) {

// Do complex processing and forward it to next topic
processorContext.forward(key.concat("__RULE_ENGINE_DATA__"), value.concat("__RULE_ENGINE_DATA__"));

//kvStore.put(Integer.toString(key.length()), Integer.toString(value.length()));
}

@Override
public void punctuate(long l) {
processorContext.commit();
}

@Override
public void close() {

//kvStore.close();
}


}


class ElasticSearchSupplier implements ProcessorSupplier<String, String> {
@Override
public Processor<String, String> get() {
return new ElasticSearchProcessor();
}
}

class ElasticSearchProcessor implements Processor<String, String> {


ProcessorContext processorContext = null;
private KeyValueStore<String, String> kvStore = null;

@Override
public void init(ProcessorContext _processorContext) {

this.processorContext = _processorContext;
//kvStore = (KeyValueStore) _processorContext.getStateStore("LENGTH_STORE");

// call the punctuate
this.processorContext.schedule(1000);

}

@Override
public void process(String key, String value) {

// Do complex processing and forward it to next topic
processorContext.forward(key.concat("__ELASTIC_SEARCH__"), value.concat("__ELASTIC_SEARCH__"));

//kvStore.put(Integer.toString(key.length()), Integer.toString(value.length()));
}

@Override
public void punctuate(long l) {
processorContext.commit();
}

@Override
public void close() {

//kvStore.close();
}


}


class RedisTransformer implements Transformer<String,String,KeyValue<String, String>> {
@Override
public void init(ProcessorContext context) {

}

@Override
public KeyValue<String, String> transform(String key, String value) {
return null;
}

@Override
public KeyValue<String, String> punctuate(long timestamp) {
return null;
}

@Override
public void close() {

}
}
Loading

0 comments on commit 24d9ac6

Please sign in to comment.