Skip to content

Commit

Permalink
[improve][proxy] Support zero-copy of NIC to NIC on Proxy (apache#15678)
Browse files Browse the repository at this point in the history
Master Issue: apache#15631 

### Motivation

Currently, when the Proxy server transfer data will have four triggered the user mode and kernel mode of context switching, respectively is the read() / write() call and return when cut, it is inefficient for the Proxy.

There is an efficient way of reducing the times of copies of data and reduce CPU load. for linux system, we can use [`splice`](https://man7.org/linux/man-pages/man2/splice.2.html) system call.
  • Loading branch information
coderzc authored Jun 17, 2022
1 parent e685742 commit 5fd6e5f
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 24 deletions.
4 changes: 4 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ advertisedAddress=
# Enable or disable the HAProxy protocol.
haProxyProtocolEnabled=false

# Enables zero-copy transport of data across network interfaces using the splice system call.
# Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.
proxyZeroCopyModeEnabled=true

# The port to use for server binary Protobuf requests
servicePort=6650

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (msg instanceof ByteBuf) {
ProtocolDetectionResult<HAProxyProtocolVersion> result =
HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
// should accumulate data if need more data to detect the protocol
if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
return;
}

if (result.state() == ProtocolDetectionState.DETECTED) {
ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder());
ctx.pipeline().remove(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.haproxy.HAProxyCommand;
Expand All @@ -40,7 +44,6 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.Arrays;
Expand Down Expand Up @@ -72,6 +75,7 @@ public class DirectProxyHandler {
private final ProxyConnection proxyConnection;
@Getter
Channel outboundChannel;
boolean isTlsOutboundChannel = false;
@Getter
private final Rate inboundChannelRequestsRate;
private final String originalPrincipal;
Expand All @@ -85,7 +89,7 @@ public class DirectProxyHandler {
private final Runnable onHandshakeCompleteAction;
private final boolean tlsHostnameVerificationEnabled;
private final boolean tlsEnabledWithKeyStore;
private final boolean tlsEnabledWithBroker;
final boolean tlsEnabledWithBroker;
private final SslContextAutoRefreshBuilder<SslContext> clientSslCtxRefresher;
private final NettySSLContextAutoRefreshBuilder clientSSLContextAutoRefreshBuilder;

Expand Down Expand Up @@ -166,6 +170,10 @@ public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddr
b.group(inboundChannel.eventLoop())
.channel(inboundChannel.getClass());

if (service.proxyZeroCopyModeEnabled && EpollSocketChannel.class.isAssignableFrom(inboundChannel.getClass())) {
b.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}

String remoteHost;
try {
remoteHost = parseHost(brokerHostAndPort);
Expand All @@ -192,12 +200,12 @@ protected void initChannel(SocketChannel ch) {
int brokerProxyReadTimeoutMs = service.getConfiguration().getBrokerProxyReadTimeoutMs();
if (brokerProxyReadTimeoutMs > 0) {
ch.pipeline().addLast("readTimeoutHandler",
new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
new ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("proxyOutboundHandler",
new ProxyBackendHandler(config, protocolVersion, remoteHost));
(ChannelHandler) new ProxyBackendHandler(config, protocolVersion, remoteHost));
}
});

Expand All @@ -209,7 +217,6 @@ protected void initChannel(SocketChannel ch) {
log.warn("[{}] Establishing connection to {} ({}) failed. Closing inbound channel.", inboundChannel,
targetBrokerAddress, brokerHostAndPort, future.cause());
inboundChannel.close();
return;
}
});
}
Expand Down Expand Up @@ -315,6 +322,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
outboundChannel.writeAndFlush(command)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
isTlsOutboundChannel = ProxyConnection.isTlsChannel(inboundChannel);
}

@Override
Expand Down Expand Up @@ -346,6 +354,20 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
}
inboundChannel.writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);

if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel == 0) {
if (!isTlsOutboundChannel && !DirectProxyHandler.this.proxyConnection.isTlsInboundChannel) {
ProxyConnection.spliceNIC2NIC((EpollSocketChannel) ctx.channel(),
(EpollSocketChannel) inboundChannel, ProxyConnection.SPLICE_BYTES)
.addListener(future -> {
if (future.isSuccess()) {
ProxyService.OPS_COUNTER.inc();
ProxyService.BYTES_COUNTER.inc(ProxyConnection.SPLICE_BYTES);
}
});
}
}

break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
doc = "Enable or disable the proxy protocol.")
private boolean haProxyProtocolEnabled;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enables zero-copy transport of data across network interfaces using the spice. "
+ "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.")
private boolean proxyZeroCopyModeEnabled = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving binary protobuf request"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -101,8 +105,11 @@ public class ProxyConnection extends PulsarHandler {
private String proxyToBrokerUrl;
private HAProxyMessage haProxyMessage;

protected static final Integer SPLICE_BYTES = 1024 * 1024 * 1024;
private static final byte[] EMPTY_CREDENTIALS = new byte[0];

boolean isTlsInboundChannel = false;

enum State {
Init,

Expand Down Expand Up @@ -161,6 +168,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ProxyService.NEW_CONNECTIONS.inc();
service.getClientCnxs().add(this);
isTlsInboundChannel = ProxyConnection.isTlsChannel(ctx.channel());
LOG.info("[{}] New connection opened", remoteAddress);
}

Expand Down Expand Up @@ -243,6 +251,18 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
}
directProxyHandler.outboundChannel.writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);

if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel == 0) {
if (!directProxyHandler.isTlsOutboundChannel && !isTlsInboundChannel) {
spliceNIC2NIC((EpollSocketChannel) ctx.channel(),
(EpollSocketChannel) directProxyHandler.outboundChannel, SPLICE_BYTES)
.addListener(future -> {
ProxyService.OPS_COUNTER.inc();
ProxyService.BYTES_COUNTER.inc(SPLICE_BYTES);
directProxyHandler.getInboundChannelRequestsRate().recordEvent(SPLICE_BYTES);
});
}
}
} else {
LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+ "Dropping the input message (readable bytes={}).", msg.getClass(), state,
Expand All @@ -259,6 +279,27 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
}
}

/**
* Use splice to zero-copy of NIC to NIC.
* @param inboundChannel input channel
* @param outboundChannel output channel
*/
protected static ChannelPromise spliceNIC2NIC(EpollSocketChannel inboundChannel,
EpollSocketChannel outboundChannel, int spliceLength) {
ChannelPromise promise = inboundChannel.newPromise();
inboundChannel.spliceTo(outboundChannel, spliceLength, promise);
promise.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess() && !(future.cause() instanceof ClosedChannelException)) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
});
return promise;
}

protected static boolean isTlsChannel(Channel channel) {
return channel.pipeline().get(ServiceChannelInitializer.TLS_HANDLER) != null;
}

private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
Supplier<ClientCnx> clientCnxSupplier;
if (service.getConfiguration().isAuthenticationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;

public class ProxyReadTimeoutHandler extends ReadTimeoutHandler {

private final Field readingField;

public ProxyReadTimeoutHandler(long timeout, TimeUnit unit) {
super(timeout, unit);
try {
this.readingField = IdleStateHandler.class.getDeclaredField("reading");
this.readingField.setAccessible(true);
} catch (NoSuchFieldException e) {
throw new IllegalArgumentException("Exception caused while get 'reading' field", e);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
this.readingField.setBoolean(this, true);
super.channelReadComplete(ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsNameResolverBuilder;
Expand Down Expand Up @@ -111,6 +113,9 @@ public class ProxyService implements Closeable {
@Setter
protected int proxyLogLevel;

@Getter
protected boolean proxyZeroCopyModeEnabled;

private final ScheduledExecutorService statsExecutor;

static final Gauge ACTIVE_CONNECTIONS = Gauge
Expand Down Expand Up @@ -222,9 +227,16 @@ public void start() throws Exception {
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));

bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
Class<? extends ServerSocketChannel> serverSocketChannelClass =
EventLoopUtil.getServerSocketChannelClass(workerGroup);
bootstrap.channel(serverSocketChannelClass);
EventLoopUtil.enableTriggeredMode(bootstrap);

if (proxyConfig.isProxyZeroCopyModeEnabled()
&& EpollServerSocketChannel.class.isAssignableFrom(serverSocketChannelClass)) {
proxyZeroCopyModeEnabled = true;
}

bootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, false));
// Bind and start to accept incoming connections.
if (proxyConfig.getServicePort().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
Expand Down Expand Up @@ -102,7 +101,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
if (brokerProxyReadTimeoutMs > 0) {
ch.pipeline().addLast("readTimeoutHandler",
new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
new ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
}
if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;

import static org.mockito.Mockito.doReturn;
import java.util.Optional;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.mockito.Mockito;
import org.testng.annotations.BeforeClass;

public class ProxyDisableZeroCopyTest extends ProxyTest {

@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();

proxyConfig.setServicePort(Optional.ofNullable(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setProxyZeroCopyModeEnabled(false);

proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();

proxyService.start();
}
}
Loading

0 comments on commit 5fd6e5f

Please sign in to comment.