diff --git a/conf/broker.conf b/conf/broker.conf
index bc09f3ad95bca..f22fb3c134d11 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -55,6 +55,9 @@ advertisedAddress=
# The Default value is absent, the broker uses the first listener as the internal listener.
# internalListenerName=
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=
diff --git a/conf/proxy.conf b/conf/proxy.conf
index ae8790e809309..820eca3399f8d 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -50,6 +50,9 @@ zooKeeperCacheExpirySeconds=300
# If not set, the value of `InetAddress.getLocalHost().getHostname()` is used.
advertisedAddress=
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
# The port to use for server binary Protobuf requests
servicePort=6650
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 7f8b5fa38547b..2f38f0ebc03e6 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -36,6 +36,9 @@ bindAddress=0.0.0.0
# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index cc7c915b1fec9..632e8b5065231 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -363,6 +363,7 @@ The Apache Software License, Version 2.0
- io.netty-netty-codec-http-4.1.51.Final.jar
- io.netty-netty-codec-http2-4.1.51.Final.jar
- io.netty-netty-codec-socks-4.1.51.Final.jar
+ - io.netty-netty-codec-haproxy-4.1.51.Final.jar
- io.netty-netty-common-4.1.51.Final.jar
- io.netty-netty-handler-4.1.51.Final.jar
- io.netty-netty-handler-proxy-4.1.51.Final.jar
diff --git a/pom.xml b/pom.xml
index d1bd700d39b9c..c64fb717037aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -462,6 +462,12 @@ flexible messaging model and an intuitive client API.
${netty-tc-native.version}
+
+ io.netty
+ netty-codec-haproxy
+ ${netty.version}
+
+
com.beust
jcommander
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2fafb556cd297..f6e3d35372443 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -163,6 +163,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "The Default value is absent, the broker uses the first listener as the internal listener.")
private String internalListenerName;
+ @FieldContext(category=CATEGORY_SERVER,
+ doc = "Enable or disable the proxy protocol.")
+ private boolean haProxyProtocolEnabled;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty IO."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index a98f68b549ab3..49b0201231b9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -162,7 +162,11 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.metadata = metadata != null ? metadata : Collections.emptyMap();
stats = new ConsumerStats();
- stats.setAddress(cnx.clientAddress().toString());
+ if (cnx.hasHAProxyMessage()) {
+ stats.setAddress(cnx.getHAProxyMessage().sourceAddress() + ":" + cnx.getHAProxyMessage().sourcePort());
+ } else {
+ stats.setAddress(cnx.clientAddress().toString());
+ }
stats.consumerName = consumerName;
stats.setConnectedSince(DateFormatter.now());
stats.setClientVersion(cnx.getClientVersion());
@@ -764,5 +768,9 @@ public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) {
this.readPositionWhenJoining = readPositionWhenJoining;
}
+ public TransportCnx cnx() {
+ return cnx;
+ }
+
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 605905b09b5b8..8eebaa36dbaf2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -109,7 +109,11 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.metadata = metadata != null ? metadata : Collections.emptyMap();
this.stats = isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats();
- stats.setAddress(cnx.clientAddress().toString());
+ if (cnx.hasHAProxyMessage()) {
+ stats.setAddress(cnx.getHAProxyMessage().sourceAddress() + ":" + cnx.getHAProxyMessage().sourcePort());
+ } else {
+ stats.setAddress(cnx.clientAddress().toString());
+ }
stats.setConnectedSince(DateFormatter.now());
stats.setClientVersion(cnx.getClientVersion());
stats.setProducerName(producerName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 7425ffb0eb3fc..ae8fe85cfd067 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -35,6 +35,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
@@ -118,6 +119,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
+ if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
+ ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
+ }
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
// https://stackoverflow.com/questions/37535482/netty-disabling-auto-read-doesnt-work-for-bytetomessagedecoder
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c5ef5fed766f6..a99ee3ea30bde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -21,6 +21,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
+
+import io.netty.handler.codec.haproxy.HAProxyMessage;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
@@ -2105,6 +2107,16 @@ public Promise newPromise() {
return ctx.newPromise();
}
+ @Override
+ public HAProxyMessage getHAProxyMessage() {
+ return proxyMessage;
+ }
+
+ @Override
+ public boolean hasHAProxyMessage() {
+ return proxyMessage != null;
+ }
+
boolean hasConsumer(long consumerId) {
return consumers.containsKey(consumerId);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index 760441afb6d37..f2744ad6a7457 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.util.concurrent.Promise;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -73,4 +74,8 @@ public interface TransportCnx {
Promise newPromise();
+ boolean hasHAProxyMessage();
+
+ HAProxyMessage getHAProxyMessage();
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
new file mode 100644
index 0000000000000..2143c0722c9f6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import io.netty.buffer.Unpooled;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class EnableProxyProtocolTest extends BrokerTestBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ conf.setHaProxyProtocolEnabled(true);
+ super.baseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testSimpleProduceAndConsume() throws PulsarClientException {
+ final String namespace = "prop/ns-abc";
+ final String topicName = "persistent://" + namespace + "/testSimpleProduceAndConsume";
+ final String subName = "my-subscriber-name";
+ final int messages = 100;
+
+ @Cleanup
+ org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ org.apache.pulsar.client.api.Producer producer = pulsarClient.newProducer().topic(topicName).create();
+ for (int i = 0; i < messages; i++) {
+ producer.send(("Message-" + i).getBytes());
+ }
+
+ int received = 0;
+ for (int i = 0; i < messages; i++) {
+ consumer.acknowledge(consumer.receive());
+ received++;
+ }
+
+ Assert.assertEquals(received, messages);
+ }
+
+ @Test
+ public void testProxyProtocol() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
+ final String namespace = "prop/ns-abc";
+ final String topicName = "persistent://" + namespace + "/testProxyProtocol";
+ final String subName = "my-subscriber-name";
+ PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+ CompletableFuture cnx = client.getCnxPool().getConnection(InetSocketAddress.createUnresolved("localhost", pulsar.getBrokerListenPort().get()));
+ // Simulate the proxy protcol message
+ cnx.get().ctx().channel().writeAndFlush(Unpooled.copiedBuffer("PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes()));
+ pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .subscribe();
+ org.apache.pulsar.broker.service.Consumer c = pulsar.getBrokerService().getTopicReference(topicName).get().getSubscription(subName).getConsumers().get(0);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertTrue(c.cnx().hasHAProxyMessage()));
+ TopicStats topicStats = admin.topics().getStats(topicName);
+ Assert.assertEquals(topicStats.subscriptions.size(), 1);
+ SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subName);
+ Assert.assertEquals(subscriptionStats.consumers.size(), 1);
+ Assert.assertEquals(subscriptionStats.consumers.get(0).getAddress(), "198.51.100.22:35646");
+
+ pulsarClient.newProducer().topic(topicName).create();
+ topicStats = admin.topics().getStats(topicName);
+ Assert.assertEquals(topicStats.publishers.size(), 1);
+ Assert.assertEquals(topicStats.publishers.get(0).getAddress(), "198.51.100.22:35646");
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 8d09606997dff..1efc0c9612232 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1731,7 +1731,7 @@ void connectionClosed(ClientCnx cnx) {
this.connectionHandler.connectionClosed(cnx);
}
- ClientCnx getClientCnx() {
+ public ClientCnx getClientCnx() {
return this.connectionHandler.cnx();
}
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index d08dfb819bba5..449ddb62c3faa 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -127,6 +127,11 @@
netty-tcnative-boringssl-static
+
+ io.netty
+ netty-codec-haproxy
+
+
org.eclipse.jetty
jetty-util
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
new file mode 100644
index 0000000000000..aa1c55e9b457a
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.ProtocolDetectionResult;
+import io.netty.handler.codec.ProtocolDetectionState;
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+
+/**
+ * Decoder that added whether a new connection is prefixed with the ProxyProtocol.
+ * More about the ProxyProtocol see: http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
+ */
+public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter {
+
+ public static final String NAME = "optional-proxy-protocol-decoder";
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof ByteBuf) {
+ ProtocolDetectionResult result =
+ HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
+ if (result.state() == ProtocolDetectionState.DETECTED) {
+ ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder());
+ ctx.pipeline().remove(this);
+ }
+ }
+ super.channelRead(ctx, msg);
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 123d2dc2dd139..ce4b64c9bd177 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -24,6 +24,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -85,8 +86,18 @@
*/
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
+ // From the proxy protocol. If present, it means the client is connected via a reverse proxy.
+ // The broker can get the real client address and proxy address from the proxy message.
+ protected HAProxyMessage proxyMessage;
+
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof HAProxyMessage) {
+ HAProxyMessage proxyMessage = (HAProxyMessage) msg;
+ this.proxyMessage = proxyMessage;
+ proxyMessage.release();
+ return;
+ }
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;
BaseCommand cmd = null;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 23d3ecb2aedca..44d7ad51f06eb 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -24,6 +24,7 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
@@ -33,10 +34,16 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
@@ -143,10 +150,56 @@ protected void initChannel(SocketChannel ch) throws Exception {
inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
}
-
+ if (config.isHaProxyProtocolEnabled()) {
+ if (proxyConnection.hasHAProxyMessage()) {
+ outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
+ } else {
+ if (inboundChannel.remoteAddress() instanceof InetSocketAddress) {
+ InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+ String sourceAddress = clientAddress.getAddress().getHostAddress();
+ int sourcePort = clientAddress.getPort();
+ if (outboundChannel.localAddress() instanceof InetSocketAddress) {
+ InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+ String destinationAddress = proxyAddress.getAddress().getHostAddress();
+ int destinationPort = proxyAddress.getPort();
+ HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
+ HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
+ outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
+ msg.release();
+ }
+ }
+ }
+ }
});
}
+ private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
+ // Max length of v1 version proxy protocol message is 108
+ ByteBuf out = Unpooled.buffer(108);
+ out.writeBytes(TEXT_PREFIX);
+ out.writeByte((byte) ' ');
+ out.writeCharSequence(msg.proxiedProtocol().name(), CharsetUtil.US_ASCII);
+ out.writeByte((byte) ' ');
+ out.writeCharSequence(msg.sourceAddress(), CharsetUtil.US_ASCII);
+ out.writeByte((byte) ' ');
+ out.writeCharSequence(msg.destinationAddress(), CharsetUtil.US_ASCII);
+ out.writeByte((byte) ' ');
+ out.writeCharSequence(String.valueOf(msg.sourcePort()), CharsetUtil.US_ASCII);
+ out.writeByte((byte) ' ');
+ out.writeCharSequence(String.valueOf(msg.destinationPort()), CharsetUtil.US_ASCII);
+ out.writeByte((byte) '\r');
+ out.writeByte((byte) '\n');
+ return out;
+ }
+
+ static final byte[] TEXT_PREFIX = {
+ (byte) 'P',
+ (byte) 'R',
+ (byte) 'O',
+ (byte) 'X',
+ (byte) 'Y',
+ };
+
enum BackendState {
Init, HandshakeCompleted
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index ace2c4ec2b985..52a34a0b4e2e0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -148,6 +148,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
+ " If not set, the value of `InetAddress.getLocalHost().getCanonicalHostName()` is used."
)
private String advertisedAddress;
+
+ @FieldContext(category=CATEGORY_SERVER,
+ doc = "Enable or disable the proxy protocol.")
+ private boolean haProxyProtocolEnabled;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving binary protobuf request"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index d0bc217829edc..63e652c4b2499 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -27,6 +27,7 @@
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -88,6 +89,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener consumer = client.newConsumer().topic(topicName).subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topicName).create();
+ for (int i = 0; i < messages; i++) {
+ producer.send(("Message-" + i).getBytes());
+ }
+
+ int received = 0;
+ for (int i = 0; i < messages; i++) {
+ consumer.acknowledge(consumer.receive());
+ received++;
+ }
+
+ Assert.assertEquals(received, messages);
+
+ TopicStats topicStats = admin.topics().getStats(topicName);
+ Assert.assertEquals(topicStats.subscriptions.size(), 1);
+ SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subName);
+ Assert.assertEquals(subscriptionStats.consumers.size(), 1);
+ Assert.assertEquals(subscriptionStats.consumers.get(0).getAddress(),
+ ((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
+
+ topicStats = admin.topics().getStats(topicName);
+ Assert.assertEquals(topicStats.publishers.size(), 1);
+ Assert.assertEquals(topicStats.publishers.get(0).getAddress(),
+ ((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
+ }
+}
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index e2734528c2fcb..80e5759fb41b2 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -237,6 +237,7 @@ The Apache Software License, Version 2.0
- netty-codec-4.1.51.Final.jar
- netty-codec-dns-4.1.51.Final.jar
- netty-codec-http-4.1.51.Final.jar
+ - netty-codec-haproxy-4.1.51.Final.jar
- netty-common-4.1.51.Final.jar
- netty-handler-4.1.51.Final.jar
- netty-reactive-streams-2.0.4.jar