Skip to content

Commit

Permalink
commit offsets manually
Browse files Browse the repository at this point in the history
  • Loading branch information
simplesteph committed Aug 28, 2018
1 parent a52671c commit 8703908
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public static KafkaConsumer<String, String> createConsumer(String topic){
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // disable auto commit of offsets
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); // disable auto commit of offsets

// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
Expand Down Expand Up @@ -97,6 +99,7 @@ public static void main(String[] args) throws IOException {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100)); // new in Kafka 2.0.0

logger.info("Received " + records.count() + " records");
for (ConsumerRecord<String, String> record : records){

// 2 strategies
Expand All @@ -116,11 +119,19 @@ public static void main(String[] args) throws IOException {
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
logger.info(indexResponse.getId());
try {
Thread.sleep(1000); // introduce a small delay
Thread.sleep(10); // introduce a small delay
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("Committing offsets...");
consumer.commitSync();
logger.info("Offsets have been committed");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// close the client gracefully
Expand Down

0 comments on commit 8703908

Please sign in to comment.