Skip to content

Commit

Permalink
[hotfix][kinesis] Update emit record javadoc and don't count max wate…
Browse files Browse the repository at this point in the history
…rmark as timeout
  • Loading branch information
tweise committed Aug 27, 2019
1 parent 0437ad2 commit b6bd8f6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
// ------------------------------------------------------------------------

/**
* Atomic operation to collect a record and update state to the sequence number of the record.
* Prepare a record and hand it over to the {@link RecordEmitter}, which may collect it asynchronously.
* This method is called by {@link ShardConsumer}s.
*
* @param record the record to collect
Expand Down Expand Up @@ -752,7 +752,8 @@ protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shar
}

/**
* Actual record emission called from the record emitter.
* Atomic operation to collect a record and update state to the sequence number of the record.
* This method is called from the record emitter.
*
* <p>Responsible for tracking per shard watermarks and emit timestamps extracted from
* the record, when a watermark assigner was configured.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ public byte[] getResult(Map<String, WatermarkState> accumulator) {
WatermarkState ws = e.getValue();
if (ws.lastUpdated + updateTimeoutMillis < currentTime) {
// ignore outdated entry
updateTimeoutCount++;
if (ws.watermark < Long.MAX_VALUE) {
updateTimeoutCount++;
}
continue;
}
globalWatermark = Math.min(ws.watermark, globalWatermark);
Expand Down

0 comments on commit b6bd8f6

Please sign in to comment.