Skip to content

Commit

Permalink
kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ppabc committed Jun 7, 2016
1 parent 90b8740 commit d3e8187
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 0 deletions.
106 changes: 106 additions & 0 deletions kafka/Kafka.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/bin/bash
## kafka 2016-06-07
## http://www.aqzt.com
##email: [email protected]
##robert yu
##centos 6

yum install -y curl openssh-server openssh-clients postfix cronie git nmap unzip wget lsof xz gcc make vim curl gcc-c++ libtool

##注意修改主机名
cat >>/etc/hosts<<EOF
192.168.142.136 master.storm.com
EOF


cat >>/etc/profile<<EOF
export JAVA_HOME=/opt/tomcat/jdk1.8.0_77/
export CLASSPATH=/opt/tomcat/jdk1.8.0_77/lib/*.jar:/opt/tomcat/jdk1.8.0_77/jre/lib/*.jar
export ZOOKEEPER=/opt/zk
export PATH=\$PATH:/opt/tomcat/jdk1.8.0_77/bin:/opt/zk/bin
EOF

tar zxvf kafka_2.10-0.9.0.1.tgz

cat >/opt/kafka_2.10-0.9.0.1/config/server.properties<<EOF
broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=master.storm.com:2181/kafka
zookeeper.connection.timeout.ms=6000
EOF

cat >>/opt/zk/conf/zoo.cfg<<EOF
maxClientCnxns=50
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/
clientPort=2181
server.1=master.storm.com:2888:3888
EOF

mkdir -p /data/zookeeper/

cat >/opt/confluent-2.0.1/etc/schema-registry/schema-registry.properties<<EOF
port=8081
kafkastore.connection.url=127.0.0.1:2181/kafka
kafkastore.topic=_schemas
debug=false
EOF

cat >/opt/confluent-2.0.1/etc/kafka-rest/kafka-rest.properties<<EOF
port=8082
id=kafka-rest-test-server
schema.registry.url=http://localhost:8081
zookeeper.connect=127.0.0.1:2181/kafka
EOF


##以下手动执行
##手动执行
source /etc/profile

#需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:
cd /opt/zk
bin/zkCli.sh

#在ZooKeeper执行如下命令创建chroot路径:
create /kafka ''
#执行如下命令:
cd /opt/kafka_2.10-0.9.0.1
/opt/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh /opt/kafka_2.10-0.9.0.1/config/server.properties >/dev/null 2>&1 &

##停止命令
##ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}' | xargs kill

##查看创建的Topic,执行如下命令:
bin/kafka-topics.sh --create --zookeeper 192.168.142.136:2181/kafka --replication-factor 1 --partitions 1 --topic mykafka

##在一个终端,启动Producer,执行如下命令:
bin/kafka-console-producer.sh --broker-list 192.168.142.136:9092 --topic mykafka

##在另一个终端,启动Consumer,执行如下命令:
bin/kafka-console-consumer.sh --zookeeper 192.168.142.136:2181/kafka --topic mykafka --from-beginning


cd /opt/confluent-2.0.1/
bin/schema-registry-start etc/schema-registry/schema-registry.properties >/dev/null 2>&1 &
##停止命令
##ps ax | grep -i 'schema-registry' | grep -v grep | awk '{print $1}' | xargs kill

cd /opt/confluent-2.0.1/
bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties >/dev/null 2>&1 &
##停止命令
##ps ax | grep -i 'kafka-rest' | grep -v grep | awk '{print $1}' | xargs kill

47 changes: 47 additions & 0 deletions kafka/Kafka_REST_Proxy_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash
###http://docs.confluent.io/1.0/kafka-rest/docs/intro.html
##本脚本解决的需求是:Kafka http-proxy 的测试


# Get a list of topics
$ curl "http://localhost:8082/topics"
[{"name":"test","num_partitions":3},{"name":"test2","num_partitions":1}]

# Get info about one partition
$ curl "http://localhost:8082/topics/test"
{"name":"test","num_partitions":3}

# Produce a message using binary embedded data with value "Kafka" to the topic test
$ curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \
--data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8082/topics/test"
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

# Produce a message using Avro embedded data, including the schema which will
# be registered with the schema registry and used to validate and serialize
# before storing the data in Kafka
$ curl -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" \
--data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/avrotest"
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}

# Create a consumer for binary data, starting at the beginning of the topic's
# log. Then consume some data from a topic.
$ curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
--data '{"id": "my_instance", "format": "binary", "auto.offset.reset": "smallest"}' \
http://localhost:8082/consumers/my_binary_consumer
{"instance_id":"my_instance","base_uri":"http://localhost:8082/consumers/my_binary_consumer/instances/my_instance"}
$ curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
http://localhost:8082/consumers/my_binary_consumer/instances/my_instance/topics/test
[{"key":null,"value":"S2Fma2E=","partition":0,"offset":0}]

# Create a consumer for Avro data, starting at the beginning of the topic's
# log. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from the schema registry.
$ curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
--data '{"id": "my_instance", "format": "avro", "auto.offset.reset": "smallest"}' \
http://localhost:8082/consumers/my_avro_consumer
{"instance_id":"my_instance","base_uri":"http://localhost:8082/consumers/my_avro_consumer/instances/my_instance"}
$ curl -X GET -H "Accept: application/vnd.kafka.avro.v1+json" \
http://localhost:8082/consumers/my_avro_consumer/instances/my_instance/topics/avrotest
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":0}]
45 changes: 45 additions & 0 deletions kafka/Kafka_init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/sh
#
# Purpose: This script starts and stops the $DAEMON_NAME daemon
#
# License: GPL
#
# chkconfig: 345 80 30 ## Add chkconfig
# description: Starts Kafka
# Source function library.
. /etc/rc.d/init.d/functions


USER=root
DAEMON_PATH=/opt/kafka_2.10-0.9.0.1/bin
DAEMON_NAME=kafka
# Check that networking is up.
#[ ${NETWORKING} = "no" ] && exit 0

PATH=$PATH:$DAEMON_PATH

# See how we were called.
case "$1" in
start)
# Start daemon.
echo -n "Starting $DAEMON_NAME: ";echo
/bin/su $USER $DAEMON_PATH/kafka-server-start.sh /opt/kafka_2.10-0.9.0.1/config/server.properties >/dev/null 2>> /var/log/kafka.log &
#/bin/su root /opt/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh /opt/kafka_2.10-0.9.0.1/config/server.properties >/dev/null 2>> /var/log/kafka.log &
echo ok
;;
stop)
# Stop daemons.
echo -n "Shutting down $DAEMON_NAME: ";echo
#$DAEMON_PATH/kafka-server-stop.sh
ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}' | xargs kill
echo ok
;;
restart)
$0 stop
sleep 1
$0 start
;;
*)
echo "Usage: $0 {start|stop|restart}"
exit 1
esac

0 comments on commit d3e8187

Please sign in to comment.