Skip to content

Commit

Permalink
Support max-connection and max-connection-per-IP (apache#10754)
Browse files Browse the repository at this point in the history
### Motivation

Pulsar supports multi-tenant, and there may be one single user occupying all connections.
Maybe novice user incorrectly used SDK, or we are being attacked.
In order to avoid a single user to affect entire cluster, I think it is important to limit the number of connections.
For simplicity, I did not use radix tree to save IPs.
  • Loading branch information
315157973 authored Jul 7, 2021
1 parent 4cbae56 commit 45caffa
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 0 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ maxNamespacesPerTenant=0
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0

# The maximum number of connections in the broker. If it exceeds, new connections are rejected.
brokerMaxConnections=0

# The maximum number of connections per IP. If it exceeds, new connections are rejected.
brokerMaxConnectionsPerIp=0

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int maxTopicsPerNamespace = 0;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "The maximum number of connections in the broker. If it exceeds, new connections are rejected."
)
private int brokerMaxConnections = 0;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "The maximum number of connections per IP. If it exceeds, new connections are rejected."
)
private int brokerMaxConnectionsPerIp = 0;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.tls.InetAddressUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface ConnectionController {

/**
* Increase the number of connections counter.
* @param remoteAddress
* @return
*/
Sate increaseConnection(SocketAddress remoteAddress);

/**
* Decrease the number of connections counter.
* @param remoteAddress
*/
void decreaseConnection(SocketAddress remoteAddress);

enum Sate {
OK, REACH_MAX_CONNECTION_PER_IP, REACH_MAX_CONNECTION;
}


class DefaultConnectionController implements ConnectionController {
private static final Logger log = LoggerFactory.getLogger(DefaultConnectionController.class);
private static final Map<String, MutableInt> CONNECTIONS = new HashMap<>();
private static final ReentrantLock lock = new ReentrantLock();
private static int totalConnectionNum = 0;

private final int maxConnections;
private final int maxConnectionPerIp;
private final boolean maxConnectionsLimitEnabled;
private final boolean maxConnectionsLimitPerIpEnabled;

public DefaultConnectionController(ServiceConfiguration configuration) {
this.maxConnections = configuration.getBrokerMaxConnections();
this.maxConnectionPerIp = configuration.getBrokerMaxConnectionsPerIp();
this.maxConnectionsLimitEnabled = configuration.getBrokerMaxConnections() > 0;
this.maxConnectionsLimitPerIpEnabled = configuration.getBrokerMaxConnectionsPerIp() > 0;
}

@Override
public Sate increaseConnection(SocketAddress remoteAddress) {
if (!maxConnectionsLimitEnabled && !maxConnectionsLimitPerIpEnabled) {
return Sate.OK;
}
if (!(remoteAddress instanceof InetSocketAddress)
|| !isLegalIpAddress(((InetSocketAddress) remoteAddress).getHostString())) {
return Sate.OK;
}
lock.lock();
try {
String ip = ((InetSocketAddress) remoteAddress).getHostString();
if (maxConnectionsLimitPerIpEnabled) {
CONNECTIONS.computeIfAbsent(ip, (x) -> new MutableInt(0)).increment();
}
if (maxConnectionsLimitEnabled) {
totalConnectionNum++;
}
if (maxConnectionsLimitEnabled && totalConnectionNum > maxConnections) {
log.info("Reject connect request from {}, because reached the maximum number of connections {}",
remoteAddress, totalConnectionNum);
return Sate.REACH_MAX_CONNECTION;
}
if (maxConnectionsLimitPerIpEnabled && CONNECTIONS.get(ip).getValue() > maxConnectionPerIp) {
log.info("Reject connect request from {}, because reached the maximum number "
+ "of connections per Ip {}",
remoteAddress, CONNECTIONS.get(ip).getValue());
return Sate.REACH_MAX_CONNECTION_PER_IP;
}
} catch (Exception e) {
log.error("increase connection failed", e);
} finally {
lock.unlock();
}
return Sate.OK;
}

@Override
public void decreaseConnection(SocketAddress remoteAddress) {
if (!maxConnectionsLimitEnabled && !maxConnectionsLimitPerIpEnabled) {
return;
}
if (!(remoteAddress instanceof InetSocketAddress)
|| !isLegalIpAddress(((InetSocketAddress) remoteAddress).getHostString())) {
return;
}
lock.lock();
try {
String ip = ((InetSocketAddress) remoteAddress).getHostString();
MutableInt mutableInt = CONNECTIONS.get(ip);
if (maxConnectionsLimitPerIpEnabled && mutableInt != null && mutableInt.decrementAndGet() <= 0) {
CONNECTIONS.remove(ip);
}
if (maxConnectionsLimitEnabled) {
totalConnectionNum--;
}
} finally {
lock.unlock();
}
}

private boolean isLegalIpAddress(String address) {
return InetAddressUtils.isIPv4Address(address) || InetAddressUtils.isIPv6Address(address);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private FeatureFlags features;

private PulsarCommandSender commandSender;
private final ConnectionController connectionController;

private static final KeySharedMeta emptyKeySharedMeta = new KeySharedMeta()
.setKeySharedMode(KeySharedMode.AUTO_SPLIT);
Expand Down Expand Up @@ -243,11 +244,21 @@ public ServerCnx(PulsarService pulsar) {
this.maxPendingBytesPerThread = conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
/ conf.getNumIOThreads();
this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2;
this.connectionController = new ConnectionController.DefaultConnectionController(conf);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ConnectionController.Sate sate = connectionController.increaseConnection(remoteAddress);
if (!sate.equals(ConnectionController.Sate.OK)) {
ctx.channel().writeAndFlush(Commands.newError(-1, ServerError.NotAllowedError,
sate.equals(ConnectionController.Sate.REACH_MAX_CONNECTION)
? "Reached the maximum number of connections"
: "Reached the maximum number of connections on address" + remoteAddress));
ctx.channel().close();
return;
}
log.info("New connection from {}", remoteAddress);
this.ctx = ctx;
this.commandSender = new PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
Expand All @@ -258,6 +269,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
connectionController.decreaseConnection(ctx.channel().remoteAddress());
isActive = false;
log.info("Closed connection from {}", remoteAddress);
BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ protected static ServiceConfiguration getDefaultConf() {
configuration.setWebServicePortTls(Optional.of(0));
configuration.setBookkeeperClientExposeStatsToPrometheus(true);
configuration.setNumExecutorThreadPoolSize(5);
configuration.setBrokerMaxConnections(0);
configuration.setBrokerMaxConnectionsPerIp(0);
return configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -69,11 +71,13 @@
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -106,6 +110,7 @@ protected void setup() throws Exception {
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
resetConfig();
}

// method for resetting state explicitly
Expand Down Expand Up @@ -234,6 +239,121 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
assertEquals(subStats.getMsgBacklog(), 0);
}

@Test
public void testConnectionController() throws Exception {
cleanup();
conf.setBrokerMaxConnections(3);
conf.setBrokerMaxConnectionsPerIp(2);
setup();
final String topicName = "persistent://prop/ns-abc/connection" + UUID.randomUUID();
List<PulsarClient> clients = new ArrayList<>();
ClientBuilder clientBuilder =
PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS)
.connectionTimeout(1, TimeUnit.DAYS)
.serviceUrl(brokerUrl.toString());
long startTime = System.currentTimeMillis();
clients.add(createNewConnection(topicName, clientBuilder));
clients.add(createNewConnection(topicName, clientBuilder));
createNewConnectionAndCheckFail(topicName, clientBuilder);
assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
cleanClient(clients);
clients.clear();

cleanup();
conf.setBrokerMaxConnections(2);
conf.setBrokerMaxConnectionsPerIp(3);
setup();
startTime = System.currentTimeMillis();
clientBuilder.serviceUrl(brokerUrl.toString());
clients.add(createNewConnection(topicName, clientBuilder));
clients.add(createNewConnection(topicName, clientBuilder));
createNewConnectionAndCheckFail(topicName, clientBuilder);
assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
cleanClient(clients);
clients.clear();
}

@Test
public void testConnectionController2() throws Exception {
cleanup();
conf.setBrokerMaxConnections(0);
conf.setBrokerMaxConnectionsPerIp(1);
setup();
final String topicName = "persistent://prop/ns-abc/connection" + UUID.randomUUID();
List<PulsarClient> clients = new ArrayList<>();
ClientBuilder clientBuilder =
PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS)
.connectionTimeout(1, TimeUnit.DAYS)
.serviceUrl(brokerUrl.toString());
long startTime = System.currentTimeMillis();
clients.add(createNewConnection(topicName, clientBuilder));
createNewConnectionAndCheckFail(topicName, clientBuilder);
assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
cleanClient(clients);
clients.clear();

cleanup();
conf.setBrokerMaxConnections(1);
conf.setBrokerMaxConnectionsPerIp(0);
setup();
startTime = System.currentTimeMillis();
clientBuilder.serviceUrl(brokerUrl.toString());
clients.add(createNewConnection(topicName, clientBuilder));
createNewConnectionAndCheckFail(topicName, clientBuilder);
assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
cleanClient(clients);
clients.clear();

cleanup();
conf.setBrokerMaxConnections(1);
conf.setBrokerMaxConnectionsPerIp(1);
setup();
startTime = System.currentTimeMillis();
clientBuilder.serviceUrl(brokerUrl.toString());
clients.add(createNewConnection(topicName, clientBuilder));
createNewConnectionAndCheckFail(topicName, clientBuilder);
assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
cleanClient(clients);
clients.clear();

cleanup();
conf.setBrokerMaxConnections(0);
conf.setBrokerMaxConnectionsPerIp(0);
setup();
clientBuilder.serviceUrl(brokerUrl.toString());
startTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
clients.add(createNewConnection(topicName, clientBuilder));
}
assertTrue(System.currentTimeMillis() - startTime < 20 * 1000);
cleanClient(clients);
clients.clear();

}

private void createNewConnectionAndCheckFail(String topicName, ClientBuilder builder) throws Exception {
try {
createNewConnection(topicName, builder);
fail("should fail");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Reached the maximum number of connections"));
}
}

private PulsarClient createNewConnection(String topicName, ClientBuilder clientBuilder) throws PulsarClientException {
PulsarClient client1 = clientBuilder.build();
client1.newProducer().topic(topicName).create().close();
return client1;
}

private void cleanClient(List<PulsarClient> clients) throws Exception {
for (PulsarClient client : clients) {
if (client != null) {
client.close();
}
}
}

@Test
public void testStatsOfStorageSizeWithSubscription() throws Exception {
final String topicName = "persistent://prop/ns-abc/no-subscription";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,10 @@ protected void handleError(CommandError error) {
connectionFuture.completeExceptionally(new PulsarClientException.AuthenticationException(error.getMessage()));
log.error("{} Failed to authenticate the client", ctx.channel());
}
if (error.getError() == ServerError.NotAllowedError) {
log.error("Get not allowed error, {}", error.getMessage());
connectionFuture.completeExceptionally(new PulsarClientException.NotAllowedException(error.getMessage()));
}
CompletableFuture<?> requestFuture = pendingRequests.remove(requestId);
if (requestFuture != null) {
requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage()));
Expand Down

0 comments on commit 45caffa

Please sign in to comment.