Skip to content

Commit

Permalink
AdaptiveRecvByteBufAllocator should ramp up while reading
Browse files Browse the repository at this point in the history
Motivation:
AdaptiveRecvByteBufAllocator currently adjusts the ByteBuf allocation size guess when readComplete is called. However the default configuration for number of reads before readComplete is called is 16. This means that there will be 16 reads done before any adjustment is done. If there is a large amount of data pending AdaptiveRecvByteBufAllocator will be slow to adjust the allocation size guess. In addition to being slow the result of only updating the guess in readComplete means that we must go back to the selector and wait to be woken up again when data is ready to read. Going back to the selector is an expensive operations and can add significant latency if there is large amount of data pending to read.

Modifications:
- AdaptiveRecvByteBufAllocator should check on each read if a step up is necessary. The step down process is left unchanged and can be more gradual at the cost of potentially over allocating.

Result:
AdaptiveRecvByteBufAllocator increases the guess size during the read loop to reduce latency when large amounts of data is being read.
  • Loading branch information
Scottmitch committed Dec 4, 2017
1 parent 7cced55 commit ec368fe
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.util.ArrayList;
import java.util.List;

import static java.lang.Math.max;
import static java.lang.Math.min;

/**
* The {@link RecvByteBufAllocator} that automatically increases and
* decreases the predicted buffer size on feed back.
Expand Down Expand Up @@ -100,22 +103,34 @@ public HandleImpl(int minIndex, int maxIndex, int initial) {
nextReceiveBufferSize = SIZE_TABLE[index];
}

@Override
public void lastBytesRead(int bytes) {
// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
// This helps adjust more quickly when large amounts of data is pending and can avoid going back to
// the selector to check for more data. Going back to the selector can add significant latency for large
// data transfers.
if (bytes == attemptedBytesRead()) {
record(bytes);
}
super.lastBytesRead(bytes);
}

@Override
public int guess() {
return nextReceiveBufferSize;
}

private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index = Math.max(index - INDEX_DECREMENT, minIndex);
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = Math.min(index + INDEX_INCREMENT, maxIndex);
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public final void incMessagesRead(int amt) {
}

@Override
public final void lastBytesRead(int bytes) {
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class AdaptiveRecvByteBufAllocatorTest {
@Mock
private ChannelConfig config;
private ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
private RecvByteBufAllocator.ExtendedHandle handle;

@Before
public void setup() {
config = mock(ChannelConfig.class);
when(config.isAutoRead()).thenReturn(true);
AdaptiveRecvByteBufAllocator recvByteBufAllocator = new AdaptiveRecvByteBufAllocator(64, 512, 1024 * 1024 * 10);
handle = (RecvByteBufAllocator.ExtendedHandle) recvByteBufAllocator.newHandle();
handle.reset(config);
}

@Test
public void rampUpBeforeReadCompleteWhenLargeDataPending() {
// Simulate that there is always more data when we attempt to read so we should always ramp up.
allocReadExpected(handle, alloc, 512);
allocReadExpected(handle, alloc, 8192);
allocReadExpected(handle, alloc, 131072);
allocReadExpected(handle, alloc, 2097152);
handle.readComplete();

handle.reset(config);
allocReadExpected(handle, alloc, 8388608);
}

@Test
public void lastPartialReadDoesNotRampDown() {
allocReadExpected(handle, alloc, 512);
// Simulate there is just 1 byte remaining which is unread. However the total bytes in the current read cycle
// means that we should stay at the current step for the next ready cycle.
allocRead(handle, alloc, 8192, 1);
handle.readComplete();

handle.reset(config);
allocReadExpected(handle, alloc, 8192);
}

@Test
public void lastPartialReadCanRampUp() {
allocReadExpected(handle, alloc, 512);
// We simulate there is just 1 less byte than we try to read, but because of the adaptive steps the total amount
// of bytes read for this read cycle steps up to prepare for the next read cycle.
allocRead(handle, alloc, 8192, 8191);
handle.readComplete();

handle.reset(config);
allocReadExpected(handle, alloc, 131072);
}

private static void allocReadExpected(RecvByteBufAllocator.ExtendedHandle handle,
ByteBufAllocator alloc,
int expectedSize) {
allocRead(handle, alloc, expectedSize, expectedSize);
}

private static void allocRead(RecvByteBufAllocator.ExtendedHandle handle,
ByteBufAllocator alloc,
int expectedBufferSize,
int lastRead) {
ByteBuf buf = handle.allocate(alloc);
assertEquals(expectedBufferSize, buf.capacity());
handle.attemptedBytesRead(expectedBufferSize);
handle.lastBytesRead(lastRead);
handle.incMessagesRead(1);
buf.release();
}
}

0 comments on commit ec368fe

Please sign in to comment.