Skip to content

Commit

Permalink
fix can't get latest offset in KafkaEightSimpleConsumerFirehoseFactory (
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijianding authored and fjy committed Aug 12, 2016
1 parent 4545878 commit df89f25
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object
);
Long startOffset = lastOffsets.get(partition);
PartitionConsumerWorker worker = new PartitionConsumerWorker(
feed, kafkaSimpleConsumer, partition, startOffset == null ? 0 : startOffset
feed, kafkaSimpleConsumer, partition, startOffset == null ? -1 : startOffset
);
consumerWorkers.add(worker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public Iterable<BytesMessageWithOffset> fetch(long offset, int timeoutMs) throws
}
catch (Exception e) {
ensureNotInterrupted(e);
log.warn(e, "caughte exception in fetch {} - {}", topic, partitionId);
log.warn(e, "caught exception in fetch {} - {}", topic, partitionId);
response = null;
}

Expand Down

0 comments on commit df89f25

Please sign in to comment.