Skip to content

Commit

Permalink
adjust package structure
Browse files Browse the repository at this point in the history
  • Loading branch information
frank.yao committed Nov 7, 2013
1 parent c028c24 commit e59cd32
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 132 deletions.
49 changes: 0 additions & 49 deletions src/main/java/com/vipshop/flume/KafkaConsumerConfig.java

This file was deleted.

53 changes: 0 additions & 53 deletions src/main/java/com/vipshop/flume/KafkaProducerConfig.java

This file was deleted.

30 changes: 7 additions & 23 deletions src/main/java/com/vipshop/flume/KafkaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.vipshop.flume.config.KafkaSinkConfig;
import com.vipshop.flume.config.KafkaSourceConfig;


public class KafkaUtil {
private static final Logger log = LoggerFactory.getLogger(KafkaUtil.class);
/**
* @param args
*/
public static String getZkConnect(Context context) {
return context.getString(KafkaProducerConstants.CONFIG_ZK_CONNECT);
}
public static String getTopic(Context context) {
return context.getString(KafkaProducerConstants.CONFIG_TOPIC);
}
public static String getBatchSize(Context context) {
return context.getString(KafkaProducerConstants.CONFIG_BATCH_SIZE, "200");
}
public static String getGroup(Context context) {
return context.getString(KafkaProducerConstants.CONFIG_GROUP);
}
public static String getResetOffset(Context context) {
return context.getString(KafkaProducerConstants.CONFIG_RESET_OFFSET, "no");
}
public static String getZKTimeout(Context context) {
return context.getString(KafkaProducerConstants.CONFIG_ZK_TIMEOUT, "15000");
}
public static Producer<String, String> getProducer(Context context) {
Producer<String, String> producer;
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connect", getZkConnect(context));
props.put("zk.connect", KafkaSinkConfig.getZkConnect(context));
props.put("producer.type", "async");
props.put("batch.size", getBatchSize(context));
props.put("batch.size", KafkaSinkConfig.getBatchSize(context));
props.put("zk.sessiontimeout.ms", "15000");
props.put("queue.size", "1000000");

Expand All @@ -54,13 +39,12 @@ public static Producer<String, String> getProducer(Context context) {
}
public static ConsumerConnector getConsumer(Context context) throws IOException, KeeperException, InterruptedException {
Properties props = new Properties();
props.put("zk.connect", getZkConnect(context));
props.put("zk.connect", KafkaSourceConfig.getZkConnect(context));
props.put("zk.sessiontimeout.ms", "60000");
// props.put("fetch.size", String.valueOf(Integer.parseInt((getBatchSize(context))) * 300 * 1024));
props.put("groupid", getGroup(context));
props.put("groupid", KafkaSourceConfig.getGroupId(context));
props.put("autocommit.enable", "false");
props.put("queuedchunks.max", "1000");
props.put("batch.size", getBatchSize(context));
props.put("autooffset.reset", "largest");
props.put("socket.buffersize", "10240000");
props.put("socket.timeout.ms", "60000");
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/com/vipshop/flume/config/KafkaSinkConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.vipshop.flume.config;

import org.apache.flume.Context;

public class KafkaSinkConfig {
public static String getProducerType(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_PRODUCER_TYPE, "sync");
}
public static String getBrokerList(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_BROKER_LIST, null);
}
public static String getZkConnect(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_ZK_CONNECT, null);
}
public static String getBufferSize(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_BUFFER_SIZE, "102400");
}
public static String getConnectTimeoutMs(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_CONNECT_TIMEOUT_MS, "5000");
}
public static String getSocketTimeoutMs(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_SOCKET_TIMEOUT_MS, "30000");
}
public static String getReconnectInterval(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_RECONNECT_INTERVAL, "30000");
}
public static String getReconnectTimeIntervalMs(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_RECONNECT_TIME_INTERVAL_MS, "10000000"); // 10*1000*1000
}
public static String getMaxMessageSize(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_MAX_MESSAGE_SIZE, "1000000");
}
public static String getCompressionCodec(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_COMPRESSION_CODEC, "0");
}
public static String getCompressedTopics(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_COMPRESSED_TOPICS, null);
}
public static String getZkReadNumRetries(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_ZK_READ_NUM_RETRIES, "3");
}
// For Async
public static String getQueueTime(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_QUEUE_TIME, "5000");
}
public static String getQueueSize(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_QUEUE_SIZE, "10000");
}
public static String getBatchSize(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_BATCH_SIZE, "200");
}
public static String getTopic(Context context) {
return context.getString(KafkaSinkConstants.CONFIG_TOPIC);
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.vipshop.flume;
package com.vipshop.flume.config;

public class KafkaProducerConstants {
public class KafkaSinkConstants {

/**
* @param args
Expand All @@ -21,4 +21,5 @@ public class KafkaProducerConstants {
public final static String CONFIG_QUEUE_TIME = "queue.time";
public final static String CONFIG_QUEUE_SIZE = "queue.size";
public final static String CONFIG_BATCH_SIZE = "batch.size";
public final static String CONFIG_TOPIC = "topic";
}
58 changes: 58 additions & 0 deletions src/main/java/com/vipshop/flume/config/KafkaSourceConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.vipshop.flume.config;

import org.apache.flume.Context;

public class KafkaSourceConfig {
public static String getGroupId(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_GROUPID, "groupid");
}
public static String getSocketTimeoutMs(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_SOCKET_TIMEOUT_MS, "30000");
}
public static String getSocketBufferSize(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_SOCKET_BUFFER_SIZE, "65536"); // 64*1024
}
public static String getFetchSize(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_FETCH_SIZE, "307200"); // 300*1024
}
public static String getBackOffIncrementMs(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_BACKOFF_INCREMENT_MS, "1000");
}
public static String getQueuedChunksMax(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_QUEUEDCHUNKS_MAX, "100");
}
public static String getAutoCommitEnable(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_AUTOCOMMIT_ENABLE, "true");
}
public static String getAutoCommitIntervalMs(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_AUTOCOMMIT_INTERVAL_MS, "10000");
}
public static String getAutoOffsetReset(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_AUTOOFFSET_RESET, "smallest");
}
public static String getConsumerTimeoutMs(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_CONSUMER_TIMEOUT_MS, "-1");
}
public static String getRebalanceRetriesMax(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_REBALANCE_RETRIES_MAX, "4");
}
public static String getMirrorTopicsWhiteList(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_MIRROR_TOPICS_WHITELIST, "");
}
public static String getMirrorTopicsBlackList(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_MIRROR_TOPICS_BLACKLIST, "");
}
public static String getMirrorConsumerNumThreads(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_MIRROR_CONSUMER_NUMTHREADS, "4");
}
public static String getBrokerList(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_BROKER_LIST, null);
}
public static String getZkConnect(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_ZK_CONNECT, null);
}
public static String getTopic(Context context) {
return context.getString(KafkaSourceConstants.CONFIG_TOPIC);
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.vipshop.flume;
package com.vipshop.flume.config;

public class KafkaConsumerConstants {
public class KafkaSourceConstants {

/**
* @param args
Expand All @@ -19,4 +19,7 @@ public class KafkaConsumerConstants {
public final static String CONFIG_MIRROR_TOPICS_WHITELIST = "mirror.topics.whitelist";
public final static String CONFIG_MIRROR_TOPICS_BLACKLIST = "mirror.topics.blacklist";
public final static String CONFIG_MIRROR_CONSUMER_NUMTHREADS = "mirror.consumer.numthreads";
public final static String CONFIG_BROKER_LIST = "broker.list";
public final static String CONFIG_ZK_CONNECT = "zk.connect";
public final static String CONFIG_TOPIC = "topic";
}
3 changes: 2 additions & 1 deletion src/main/java/com/vipshop/flume/sink/kafka/KafkaSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.slf4j.LoggerFactory;

import com.vipshop.flume.KafkaUtil;
import com.vipshop.flume.config.KafkaSinkConfig;

public class KafkaSink extends AbstractSink implements Configurable{
private static final Logger log = LoggerFactory.getLogger(KafkaSink.class);
Expand Down Expand Up @@ -52,7 +53,7 @@ public Status process() throws EventDeliveryException {
}

public void configure(Context context) {
this.topic = KafkaUtil.getTopic(context);
this.topic = KafkaSinkConfig.getTopic(context);
this.producer = KafkaUtil.getProducer(context);
log.debug("-------Init producer done-----------");
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/vipshop/flume/source/kafka/KafkaSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;

import com.vipshop.flume.KafkaUtil;
import com.vipshop.flume.config.KafkaSourceConfig;

public class KafkaSource extends AbstractSource implements Configurable, PollableSource {

Expand Down Expand Up @@ -76,7 +77,7 @@ public Status process() throws EventDeliveryException {
}

public void configure(Context context) {
this.topic = KafkaUtil.getTopic(context);
this.topic = KafkaSourceConfig.getTopic(context);
try {
this.consumer = KafkaUtil.getConsumer(context);
} catch (IOException e) {
Expand All @@ -86,7 +87,6 @@ public void configure(Context context) {
} catch (InterruptedException e) {
e.printStackTrace();
}
this.batchSize = Integer.parseInt(KafkaUtil.getBatchSize(context));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
Expand Down

0 comments on commit e59cd32

Please sign in to comment.