Skip to content

Commit

Permalink
Ensure ServerChannel implementations accept multiple connections per …
Browse files Browse the repository at this point in the history
…read loop (netty#11729)


Motivation:

Due how the DefaultMaxMessagesRecvByteBufAllocator is implemented we did only ever accept one connection per read loop even thought there might be more pending.

Modifications:

- Introduce ServerChannelRecvByteBufAllocator and use it for all our ServerChannel implementations

Result:

Fixes netty#11708
  • Loading branch information
normanmaurer authored Oct 6, 2021
1 parent 9802afd commit c3e33d3
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 7 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@
<!-- Added to fix false-positive -->
<exclude>io.netty.handler.codec.dns.TcpDnsQueryDecoder</exclude>
<exclude>io.netty.handler.codec.dns.TcpDnsResponseEncoder</exclude>
<exclude>io.netty.channel.ServerChannelRecvByteBufAllocator</exclude>
</excludes>
</parameter>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public class EpollChannelConfig extends DefaultChannelConfig {
super(channel);
}

EpollChannelConfig(AbstractEpollChannel channel, RecvByteBufAllocator recvByteBufAllocator) {
super(channel, recvByteBufAllocator);
}

@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), EpollChannelOption.EPOLL_MODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannelRecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.NetUtil;
Expand All @@ -38,7 +39,7 @@ public class EpollServerChannelConfig extends EpollChannelConfig implements Serv
private volatile int pendingFastOpenRequestsThreshold;

EpollServerChannelConfig(AbstractEpollChannel channel) {
super(channel);
super(channel, new ServerChannelRecvByteBufAllocator());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public class KQueueChannelConfig extends DefaultChannelConfig {
super(channel);
}

KQueueChannelConfig(AbstractKQueueChannel channel, RecvByteBufAllocator recvByteBufAllocator) {
super(channel, recvByteBufAllocator);
}

@Override
@SuppressWarnings("deprecation")
public Map<ChannelOption<?>, Object> getOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannelRecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.NetUtil;
Expand All @@ -38,7 +39,7 @@ public class KQueueServerChannelConfig extends KQueueChannelConfig implements Se
private volatile int backlog = NetUtil.SOMAXCONN;

KQueueServerChannelConfig(AbstractKQueueChannel channel) {
super(channel);
super(channel, new ServerChannelRecvByteBufAllocator());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannelRecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.NetUtil;
import io.netty.util.internal.ObjectUtil;
Expand All @@ -45,7 +46,7 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme
*/
public DefaultSctpServerChannelConfig(
io.netty.channel.sctp.SctpServerChannel channel, SctpServerChannel javaChannel) {
super(channel);
super(channel, new ServerChannelRecvByteBufAllocator());
this.javaChannel = ObjectUtil.checkNotNull(javaChannel, "javaChannel");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* and also prevents overflow.
*/
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
private final boolean ignoreBytesRead;
private volatile int maxMessagesPerRead;
private volatile boolean respectMaybeMoreData = true;

Expand All @@ -34,6 +35,11 @@ public DefaultMaxMessagesRecvByteBufAllocator() {
}

public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
this(maxMessagesPerRead, false);
}

DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead, boolean ignoreBytesRead) {
this.ignoreBytesRead = ignoreBytesRead;
maxMessagesPerRead(maxMessagesPerRead);
}

Expand Down Expand Up @@ -141,8 +147,7 @@ public boolean continueReading() {
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
totalMessages < maxMessagePerRead && (ignoreBytesRead || totalBytesRead > 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;

/**
* {@link MaxMessagesRecvByteBufAllocator} implementation which should be used for {@link ServerChannel}s.
*/
public final class ServerChannelRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
public ServerChannelRecvByteBufAllocator() {
super(1, true);
}

@Override
public Handle newHandle() {
return new MaxMessageHandle() {
@Override
public int guess() {
return 128;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.PreferHeapByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;
import io.netty.channel.ServerChannelRecvByteBufAllocator;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.SingleThreadEventExecutor;

Expand All @@ -35,7 +36,8 @@
*/
public class LocalServerChannel extends AbstractServerChannel {

private final ChannelConfig config = new DefaultChannelConfig(this);
private final ChannelConfig config =
new DefaultChannelConfig(this, new ServerChannelRecvByteBufAllocator()) { };
private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
private final Runnable shutdownHook = new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannelRecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.NetUtil;
import io.netty.util.internal.ObjectUtil;
Expand All @@ -47,7 +48,7 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
* Creates a new instance.
*/
public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
super(channel);
super(channel, new ServerChannelRecvByteBufAllocator());
this.javaSocket = ObjectUtil.checkNotNull(javaSocket, "javaSocket");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;

import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class DefaultMaxMessagesRecvByteBufAllocatorTest {

private DefaultMaxMessagesRecvByteBufAllocator newAllocator(boolean ignoreReadBytes) {
return new DefaultMaxMessagesRecvByteBufAllocator(2, ignoreReadBytes) {
@Override
public Handle newHandle() {
return new MaxMessageHandle() {
@Override
public int guess() {
return 0;
}
};
}
};
}

@Test
public void testRespectReadBytes() {
DefaultMaxMessagesRecvByteBufAllocator allocator = newAllocator(false);
RecvByteBufAllocator.Handle handle = allocator.newHandle();

EmbeddedChannel channel = new EmbeddedChannel();
handle.reset(channel.config());
handle.incMessagesRead(1);
assertFalse(handle.continueReading());

handle.reset(channel.config());
handle.incMessagesRead(1);
handle.attemptedBytesRead(1);
handle.lastBytesRead(1);
assertTrue(handle.continueReading());
channel.finish();
}

@Test
public void testIgnoreReadBytes() {
DefaultMaxMessagesRecvByteBufAllocator allocator = newAllocator(true);
RecvByteBufAllocator.Handle handle = allocator.newHandle();

EmbeddedChannel channel = new EmbeddedChannel();
handle.reset(channel.config());
handle.incMessagesRead(1);
assertTrue(handle.continueReading());
handle.incMessagesRead(1);
assertFalse(handle.continueReading());

handle.reset(channel.config());
handle.attemptedBytesRead(0);
handle.lastBytesRead(0);
assertTrue(handle.continueReading());
channel.finish();
}
}

0 comments on commit c3e33d3

Please sign in to comment.