Skip to content

Commit

Permalink
Fixed getting the position in PulsarKafkaConsumer when we don't have …
Browse files Browse the repository at this point in the history
…an offset yet (apache#900)
  • Loading branch information
merlimat authored Nov 14, 2017
1 parent 6278855 commit 635b846
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ public void seekToEnd(Collection<TopicPartition> partitions) {

@Override
public long position(TopicPartition partition) {
return lastReceivedOffset.get(partition);
Long offset = lastReceivedOffset.get(partition);
return offset != null ? offset : -1l;
}

@Override
Expand Down

0 comments on commit 635b846

Please sign in to comment.