Skip to content

Commit

Permalink
[hotfix][Kafka 0.9] Avoid committing offsets to closed consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
rmetzger committed Mar 21, 2016
1 parent 6078923 commit 35892ed
Showing 1 changed file with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ public void close() throws Exception {

@Override
protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
if(!running) {
LOG.warn("Unable to commit offsets on closed consumer");
return;
}
Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = convertToCommitMap(checkpointOffsets);
synchronized (this.consumer) {
this.consumer.commitSync(kafkaCheckpointOffsets);
Expand Down

0 comments on commit 35892ed

Please sign in to comment.