Skip to content

Commit

Permalink
params are now handled by user's conf, flume-kafka will not keep defa…
Browse files Browse the repository at this point in the history
…ult value of kafka config
  • Loading branch information
frank.yao committed Nov 7, 2013
1 parent 71cb9b8 commit 94f5ca0
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 211 deletions.
46 changes: 21 additions & 25 deletions src/main/java/com/vipshop/flume/KafkaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,45 +14,41 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.vipshop.flume.config.KafkaSinkConfig;
import com.vipshop.flume.config.KafkaSourceConfig;


public class KafkaUtil {
private static final Logger log = LoggerFactory.getLogger(KafkaUtil.class);
/**
* @param args
*/
public static String getKafkaConfigParameter(Context context, String key) {
return context.getString(key);
}
public static Properties getKafkaConfigProperties(Context context) {
Properties props = new Properties();
String contextString = context.toString();
for(final String kv : contextString.substring(14,contextString.length()-3).split(", ")) {
log.info("TODO KV:" + kv);
String k = kv.trim().split("=")[0];
log.info("K:" + k);
String v = kv.trim().split("=")[1];
log.info("V:" + v);
if (!k.equals("type") && !k.equals("channel")) {
props.put(k, v);
}
}
log.info("PROPS:" + props);
return props;
}
public static Producer<String, String> getProducer(Context context) {
log.info(context.toString());
Producer<String, String> producer;
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connect", KafkaSinkConfig.getZkConnect(context));
props.put("producer.type", "async");
props.put("batch.size", KafkaSinkConfig.getBatchSize(context));

props.put("queue.size", "1000000");

producer = new Producer<String, String>(new ProducerConfig(props));
log.debug("-----------return producer");
producer = new Producer<String, String>(new ProducerConfig(getKafkaConfigProperties(context)));
return producer;
}
public static ConsumerConnector getConsumer(Context context) throws IOException, KeeperException, InterruptedException {
log.info(context.toString());
Properties props = new Properties();
props.put("zk.connect", KafkaSourceConfig.getZkConnect(context));
props.put("zk.sessiontimeout.ms", "60000");
// props.put("fetch.size", String.valueOf(Integer.parseInt((getBatchSize(context))) * 300 * 1024));
props.put("groupid", KafkaSourceConfig.getGroupId(context));
props.put("autocommit.enable", "false");
props.put("queuedchunks.max", "1000");
props.put("autooffset.reset", "largest");
props.put("socket.buffersize", "10240000");
props.put("socket.timeout.ms", "60000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConfig consumerConfig = new ConsumerConfig(getKafkaConfigProperties(context));
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
log.debug("-----------return consumer");
return consumer;
}
}
Expand Down
18 changes: 0 additions & 18 deletions src/main/java/com/vipshop/flume/config/FlumeUtil.java

This file was deleted.

56 changes: 0 additions & 56 deletions src/main/java/com/vipshop/flume/config/KafkaSinkConfig.java

This file was deleted.

25 changes: 0 additions & 25 deletions src/main/java/com/vipshop/flume/config/KafkaSinkConstants.java

This file was deleted.

58 changes: 0 additions & 58 deletions src/main/java/com/vipshop/flume/config/KafkaSourceConfig.java

This file was deleted.

25 changes: 0 additions & 25 deletions src/main/java/com/vipshop/flume/config/KafkaSourceConstants.java

This file was deleted.

3 changes: 1 addition & 2 deletions src/main/java/com/vipshop/flume/sink/kafka/KafkaSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.slf4j.LoggerFactory;

import com.vipshop.flume.KafkaUtil;
import com.vipshop.flume.config.KafkaSinkConfig;

public class KafkaSink extends AbstractSink implements Configurable{
private static final Logger log = LoggerFactory.getLogger(KafkaSink.class);
Expand Down Expand Up @@ -53,7 +52,7 @@ public Status process() throws EventDeliveryException {
}

public void configure(Context context) {
this.topic = KafkaSinkConfig.getTopic(context);
this.topic = KafkaUtil.getKafkaConfigParameter(context, "topic");
this.producer = KafkaUtil.getProducer(context);
log.debug("-------Init producer done-----------");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.slf4j.LoggerFactory;

import com.vipshop.flume.KafkaUtil;
import com.vipshop.flume.config.KafkaSourceConfig;

public class KafkaSource extends AbstractSource implements Configurable, PollableSource {

Expand Down Expand Up @@ -77,7 +76,7 @@ public Status process() throws EventDeliveryException {
}

public void configure(Context context) {
this.topic = KafkaSourceConfig.getTopic(context);
this.topic = KafkaUtil.getKafkaConfigParameter(context, "topic");
try {
this.consumer = KafkaUtil.getConsumer(context);
} catch (IOException e) {
Expand Down

0 comments on commit 94f5ca0

Please sign in to comment.