Skip to content

Commit

Permalink
Support HAProxy proxy protocol for broker and proxy (apache#8686)
Browse files Browse the repository at this point in the history
### Motivation

Currently, if enable the proxy in the pulsar cluster and client connect to the cluster through the proxy, when we get topic stats, the consumer address and producer address are not the real client address. This PR fix this problem by leverage HAProxy proxy protocol since this is a more general approach.  more details about the proxy protocol see 
https://www.haproxy.com/blog/haproxy/proxy-protocol/
https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-proxy-protocol.html
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt

### Modifications

Allow enable proxy protocol on proxy or broker.

### Verifying this change

Tests added
  • Loading branch information
codelipenghui authored Nov 30, 2020
1 parent 39aaaf3 commit 625627c
Show file tree
Hide file tree
Showing 22 changed files with 423 additions and 5 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ advertisedAddress=
# The Default value is absent, the broker uses the first listener as the internal listener.
# internalListenerName=

# Enable or disable the HAProxy protocol.
haProxyProtocolEnabled=false

# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=

Expand Down
3 changes: 3 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ zooKeeperCacheExpirySeconds=300
# If not set, the value of `InetAddress.getLocalHost().getHostname()` is used.
advertisedAddress=

# Enable or disable the HAProxy protocol.
haProxyProtocolEnabled=false

# The port to use for server binary Protobuf requests
servicePort=6650

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ bindAddress=0.0.0.0
# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=

# Enable or disable the HAProxy protocol.
haProxyProtocolEnabled=false

# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=

Expand Down
1 change: 1 addition & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ The Apache Software License, Version 2.0
- io.netty-netty-codec-http-4.1.51.Final.jar
- io.netty-netty-codec-http2-4.1.51.Final.jar
- io.netty-netty-codec-socks-4.1.51.Final.jar
- io.netty-netty-codec-haproxy-4.1.51.Final.jar
- io.netty-netty-common-4.1.51.Final.jar
- io.netty-netty-handler-4.1.51.Final.jar
- io.netty-netty-handler-proxy-4.1.51.Final.jar
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,12 @@ flexible messaging model and an intuitive client API.</description>
<version>${netty-tc-native.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
<version>${netty.version}</version>
</dependency>

<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "The Default value is absent, the broker uses the first listener as the internal listener.")
private String internalListenerName;

@FieldContext(category=CATEGORY_SERVER,
doc = "Enable or disable the proxy protocol.")
private boolean haProxyProtocolEnabled;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty IO."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.metadata = metadata != null ? metadata : Collections.emptyMap();

stats = new ConsumerStats();
stats.setAddress(cnx.clientAddress().toString());
if (cnx.hasHAProxyMessage()) {
stats.setAddress(cnx.getHAProxyMessage().sourceAddress() + ":" + cnx.getHAProxyMessage().sourcePort());
} else {
stats.setAddress(cnx.clientAddress().toString());
}
stats.consumerName = consumerName;
stats.setConnectedSince(DateFormatter.now());
stats.setClientVersion(cnx.getClientVersion());
Expand Down Expand Up @@ -764,5 +768,9 @@ public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) {
this.readPositionWhenJoining = readPositionWhenJoining;
}

public TransportCnx cnx() {
return cnx;
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.metadata = metadata != null ? metadata : Collections.emptyMap();

this.stats = isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats();
stats.setAddress(cnx.clientAddress().toString());
if (cnx.hasHAProxyMessage()) {
stats.setAddress(cnx.getHAProxyMessage().sourceAddress() + ":" + cnx.getHAProxyMessage().sourcePort());
} else {
stats.setAddress(cnx.clientAddress().toString());
}
stats.setConnectedSince(DateFormatter.now());
stats.setClientVersion(cnx.getClientVersion());
stats.setProducerName(producerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
Expand Down Expand Up @@ -118,6 +119,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}

if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
// https://stackoverflow.com/questions/37535482/netty-disabling-auto-read-doesnt-work-for-bytetomessagedecoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;

import io.netty.handler.codec.haproxy.HAProxyMessage;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
Expand Down Expand Up @@ -2105,6 +2107,16 @@ public Promise<Void> newPromise() {
return ctx.newPromise();
}

@Override
public HAProxyMessage getHAProxyMessage() {
return proxyMessage;
}

@Override
public boolean hasHAProxyMessage() {
return proxyMessage != null;
}

boolean hasConsumer(long consumerId) {
return consumers.containsKey(consumerId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.util.concurrent.Promise;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;

Expand Down Expand Up @@ -73,4 +74,8 @@ public interface TransportCnx {

Promise<Void> newPromise();

boolean hasHAProxyMessage();

HAProxyMessage getHAProxyMessage();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import io.netty.buffer.Unpooled;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class EnableProxyProtocolTest extends BrokerTestBase {

@BeforeClass
@Override
protected void setup() throws Exception {
conf.setHaProxyProtocolEnabled(true);
super.baseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSimpleProduceAndConsume() throws PulsarClientException {
final String namespace = "prop/ns-abc";
final String topicName = "persistent://" + namespace + "/testSimpleProduceAndConsume";
final String subName = "my-subscriber-name";
final int messages = 100;

@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();

@Cleanup
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
for (int i = 0; i < messages; i++) {
producer.send(("Message-" + i).getBytes());
}

int received = 0;
for (int i = 0; i < messages; i++) {
consumer.acknowledge(consumer.receive());
received++;
}

Assert.assertEquals(received, messages);
}

@Test
public void testProxyProtocol() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
final String namespace = "prop/ns-abc";
final String topicName = "persistent://" + namespace + "/testProxyProtocol";
final String subName = "my-subscriber-name";
PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
CompletableFuture<ClientCnx> cnx = client.getCnxPool().getConnection(InetSocketAddress.createUnresolved("localhost", pulsar.getBrokerListenPort().get()));
// Simulate the proxy protcol message
cnx.get().ctx().channel().writeAndFlush(Unpooled.copiedBuffer("PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes()));
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
org.apache.pulsar.broker.service.Consumer c = pulsar.getBrokerService().getTopicReference(topicName).get().getSubscription(subName).getConsumers().get(0);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertTrue(c.cnx().hasHAProxyMessage()));
TopicStats topicStats = admin.topics().getStats(topicName);
Assert.assertEquals(topicStats.subscriptions.size(), 1);
SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subName);
Assert.assertEquals(subscriptionStats.consumers.size(), 1);
Assert.assertEquals(subscriptionStats.consumers.get(0).getAddress(), "198.51.100.22:35646");

pulsarClient.newProducer().topic(topicName).create();
topicStats = admin.topics().getStats(topicName);
Assert.assertEquals(topicStats.publishers.size(), 1);
Assert.assertEquals(topicStats.publishers.get(0).getAddress(), "198.51.100.22:35646");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@ void connectionClosed(ClientCnx cnx) {
this.connectionHandler.connectionClosed(cnx);
}

ClientCnx getClientCnx() {
public ClientCnx getClientCnx() {
return this.connectionHandler.cnx();
}

Expand Down
5 changes: 5 additions & 0 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@
<artifactId>netty-tcnative-boringssl-static</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.ProtocolDetectionResult;
import io.netty.handler.codec.ProtocolDetectionState;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;

/**
* Decoder that added whether a new connection is prefixed with the ProxyProtocol.
* More about the ProxyProtocol see: http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
*/
public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter {

public static final String NAME = "optional-proxy-protocol-decoder";

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ProtocolDetectionResult<HAProxyProtocolVersion> result =
HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
if (result.state() == ProtocolDetectionState.DETECTED) {
ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder());
ctx.pipeline().remove(this);
}
}
super.channelRead(ctx, msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.handler.codec.haproxy.HAProxyMessage;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
Expand Down Expand Up @@ -85,8 +86,18 @@
*/
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {

// From the proxy protocol. If present, it means the client is connected via a reverse proxy.
// The broker can get the real client address and proxy address from the proxy message.
protected HAProxyMessage proxyMessage;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
HAProxyMessage proxyMessage = (HAProxyMessage) msg;
this.proxyMessage = proxyMessage;
proxyMessage.release();
return;
}
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;
BaseCommand cmd = null;
Expand Down
Loading

0 comments on commit 625627c

Please sign in to comment.