Skip to content

Commit

Permalink
[FLINK-9897] Make adaptive reads depend on run loop time instead of f…
Browse files Browse the repository at this point in the history
…etch interval millis

This closes apache#6408.
  • Loading branch information
lrao100 authored and tzulitai committed Aug 1, 2018
1 parent 7397f31 commit 58ca87a
Showing 1 changed file with 50 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,22 +207,14 @@ public void run() {
}
}

long lastTimeNanos = 0;
long processingStartTimeNanos = System.nanoTime();
while (isRunning()) {
if (nextShardItr == null) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());

// we can close this consumer thread once we've reached the end of the subscribed shard
break;
} else {
if (fetchIntervalMillis != 0) {
long elapsedTimeNanos = System.nanoTime() - lastTimeNanos;
long sleepTimeMillis = fetchIntervalMillis - (elapsedTimeNanos / 1_000_000);
if (sleepTimeMillis > 0) {
Thread.sleep(sleepTimeMillis);
}
lastTimeNanos = System.nanoTime();
}

GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);

Expand All @@ -233,26 +225,68 @@ public void run() {
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());

long recordBatchSizeBytes = 0L;
long averageRecordSizeBytes = 0L;

for (UserRecord record : fetchedRecords) {
recordBatchSizeBytes += record.getData().remaining();
deserializeRecordForCollectionAndUpdateState(record);
}

if (useAdaptiveReads && !fetchedRecords.isEmpty()) {
averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size();
maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
}

nextShardItr = getRecordsResult.getNextShardIterator();

long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}

/**
* Adjusts loop timing to match target frequency if specified.
* @param processingStartTimeNanos The start time of the run loop "work"
* @param processingEndTimeNanos The end time of the run loop "work"
* @return The System.nanoTime() after the sleep (if any)
* @throws InterruptedException
*/
protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos)
throws InterruptedException {
long endTimeNanos = processingEndTimeNanos;
if (fetchIntervalMillis != 0) {
long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos;
long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000);
if (sleepTimeMillis > 0) {
Thread.sleep(sleepTimeMillis);
endTimeNanos = System.nanoTime();
}
}
return endTimeNanos;
}

/**
* Calculates how many records to read each time through the loop based on a target throughput
* and the measured frequenecy of the loop.
* @param runLoopTimeNanos The total time of one pass through the loop
* @param numRecords The number of records of the last read operation
* @param recordBatchSizeBytes The total batch size of the last read operation
* @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch
*/
private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes,
int maxNumberOfRecordsPerFetch) {
if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) {
long averageRecordSizeBytes = recordBatchSizeBytes / numRecords;
// Adjust number of records to fetch from the shard depending on current average record size
// to optimize 2 Mb / sec read limits
double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
// Ensure the value is not more than 10000L
maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
}
return maxNumberOfRecordsPerFetch;
}

/**
* The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
* by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
Expand Down Expand Up @@ -347,23 +381,4 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr
protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
}

/**
* Adapts the maxNumberOfRecordsPerFetch based on the current average record size
* to optimize 2 Mb / sec read limits.
*
* @param averageRecordSizeBytes
* @return adaptedMaxRecordsPerFetch
*/

protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis));

// Ensure the value is not more than 10000L
adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
}
return adaptedMaxRecordsPerFetch;
}
}

0 comments on commit 58ca87a

Please sign in to comment.