Skip to content

Commit

Permalink
Allow to configure busy-wait in broker and client (apache#10661)
Browse files Browse the repository at this point in the history
Co-authored-by: lipenghui <[email protected]>
  • Loading branch information
merlimat and codelipenghui authored May 22, 2021
1 parent 6347b52 commit 1060043
Show file tree
Hide file tree
Showing 30 changed files with 204 additions and 25 deletions.
6 changes: 6 additions & 0 deletions conf/bookkeeper.conf
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ maxPendingReadRequestsPerThread=2500
# avoid the executor queue to grow indefinitely
maxPendingAddRequestsPerThread=10000

# Option to enable busy-wait settings. Default is false.
# WARNING: This option will enable spin-waiting on executors and IO threads in order to reduce latency during
# context switches. The spinning will consume 100% CPU even when bookie is not doing any work. It is recommended to
# reduce the number of threads in the main workers pool and Netty event loop to only have few CPU cores busy.
enableBusyWait=false

# Whether force compaction is allowed when the disk is full or almost full.
# Forcing GC may get some space back, but may also fill up disk space more quickly.
# This is because new log files are created before GC, while old garbage
Expand Down
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ numExecutorThreadPoolSize=
# Default is 10
numCacheExecutorThreadPoolSize=10

# Option to enable busy-wait settings. Default is false.
# WARNING: This option will enable spin-waiting on executors and IO threads in order to reduce latency during
# context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It is recommended to
# reduce the number of IO threads and BK client threads to only have few CPU cores busy.
enableBusyWait=false

# Max concurrent web requests
maxConcurrentHttpRequests=1024

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ flexible messaging model and an intuitive client API.</description>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>cpu-affinity</artifactId>
<version>${bookkeeper.version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int numCacheExecutorThreadPoolSize = 10;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Option to enable busy-wait settings. Default is false. "
+ "WARNING: This option will enable spin-waiting on executors and IO threads in order "
+ "to reduce latency during context switches. The spinning will consume 100% CPU even "
+ "when the broker is not doing any work. It is recommended to reduce the number of IO threads "
+ "and BK client threads to only have few CPU cores busy."
)
private boolean enableBusyWait = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web requests")
private int maxConcurrentHttpRequests = 1024;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) {
bkConf.setTLSTrustStorePasswordPath(conf.getBookkeeperTLSTrustStorePasswordPath());
}

bkConf.setBusyWaitEnabled(conf.isEnableBusyWait());
bkConf.setNumWorkerThreads(conf.getBookkeeperClientNumWorkerThreads());
bkConf.setThrottleValue(conf.getBookkeeperClientThrottleValue());
bkConf.setAddEntryTimeout((int) conf.getBookkeeperClientTimeoutInSeconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public PulsarService(ServiceConfiguration config,
this.transactionReplayExecutor = null;
}

this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(),
this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
new DefaultThreadFactory("pulsar-io"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor");

this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
pulsar.getConfiguration().getNumAcceptorThreads(), acceptorThreadFactory);
pulsar.getConfiguration().getNumAcceptorThreads(), false, acceptorThreadFactory);
this.workerGroup = eventLoopGroup;
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public static void main(String[] args) throws Exception {
(int) brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
BookKeeperClientFactory bkClientFactory = new BookKeeperClientFactoryImpl();

EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("compactor-io"));
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(1, false,
new DefaultThreadFactory("compactor-io"));
BookKeeper bk = bkClientFactory.create(brokerConfig, zk, eventLoopGroup, Optional.empty(), null);
try (PulsarClient pulsar = clientBuilder.build()) {
Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk, scheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import java.util.Optional;
import java.util.Properties;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -86,6 +87,10 @@ protected void setup() throws Exception {
config.setAdvertisedAddress("127.0.0.1");
config.setAllowAutoTopicCreationType("non-partitioned");
config.setZooKeeperOperationTimeoutSeconds(1);
config.setNumIOThreads(1);
Properties properties = new Properties();
properties.put("bookkeeper_numWorkerThreads", "1");
config.setProperties(properties);
configurePulsar(config);

pulsar = new PulsarService(config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

@Test(groups = "broker")
public class BusyWaitServiceTest extends BkEnsemblesTestBase {
public BusyWaitServiceTest() {
super(1);
}

protected void configurePulsar(ServiceConfiguration config) {
config.setEnableBusyWait(true);
config.setManagedLedgerDefaultEnsembleSize(1);
config.setManagedLedgerDefaultWriteQuorum(1);
config.setManagedLedgerDefaultAckQuorum(1);
}

@Test
public void testPublishWithBusyWait() throws Exception {

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.enableBusyWait(true)
.build();

String namespace = "prop/busy-wait";
admin.namespaces().createNamespace(namespace);

String topic = namespace + "/my-topic-" + UUID.randomUUID();

@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.subscribe();

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();

for (int i = 0; i < 10; i++) {
producer.send("my-message-" + i);
}

for (int i = 0; i < 10; i++) {
Message<String> msg = consumer.receive();
assertNotNull(msg);
assertEquals(msg.getValue(), "my-message-" + i);
consumer.acknowledge(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void startMockBrokerService() throws Exception {
final int MaxMessageSize = 5 * 1024 * 1024;

try {
workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, threadFactory);
workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, threadFactory);

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(workerGroup, workerGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void cleanup() throws Exception {
@Test
public void testSingleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
Expand All @@ -77,7 +77,7 @@ public void testDoubleIpAddress() throws Exception {
String serviceUrl = "pulsar://non-existing-dns-name:" + pulsar.getBrokerListenPort().get();

ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
Expand All @@ -100,7 +100,7 @@ public void testDoubleIpAddress() throws Exception {
public void testNoConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(0);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));

InetSocketAddress brokerAddress =
Expand All @@ -122,7 +122,7 @@ public void testNoConnectionPool() throws Exception {
public void testEnableConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(5);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));

InetSocketAddress brokerAddress =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static PulsarTestClient create(ClientBuilder clientBuilder) throws Pulsar
// An anonymous subclass of ClientCnx class is used to override the getRemoteEndpointProtocolVersion()
// method.
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(clientConfigurationData.getNumIoThreads(),
false,
new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()));

AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,18 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
*/
ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);

/**
* Option to enable busy-wait settings. Default is false.
*
* <b>WARNING</b>: This option will enable spin-waiting on executors and IO threads in order to reduce latency
* during context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It
* is recommended to reduce the number of IO threads and BK client threads to only have few CPU cores busy.
*
* @param enableBusyWait whether to enable busy wait
* @return the client builder instance
*/
ClientBuilder enableBusyWait(boolean enableBusyWait);

/**
* The clock used by the pulsar client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) {
return this;
}

@Override
public ClientBuilder enableBusyWait(boolean enableBusyWait) {
conf.setEnableBusyWait(enableBusyWait);
return this;
}

public ClientConfigurationData getClientConfigurationData() {
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {

private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), conf.isEnableBusyWait(), threadFactory);
}

private static ThreadFactory getThreadFactory(String poolName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private int requestTimeoutMs = 60000;
private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);
private boolean enableBusyWait = false;
//
private String listenerName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ClientCnxRequestTimeoutQueueTest {

@BeforeTest
void setupClientCnx() throws Exception {
eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
ClientConfigurationData conf = new ClientConfigurationData();
conf.setKeepAliveIntervalSeconds(0);
conf.setOperationTimeoutMs(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ClientCnxTest {

@Test
public void testClientCnxTimeout() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10);
conf.setKeepAliveIntervalSeconds(0);
Expand All @@ -75,7 +75,7 @@ public void testClientCnxTimeout() throws Exception {
@Test
public void testReceiveErrorAtSendConnectFrameState() throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, threadFactory);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10);
ClientCnx cnx = new ClientCnx(conf, eventLoop);
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testReceiveErrorAtSendConnectFrameState() throws Exception {
@Test
public void testGetLastMessageIdWithError() throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, threadFactory);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
ClientCnx cnx = new ClientCnx(conf, eventLoop);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testGetStats() throws Exception {
conf.setStatsIntervalSeconds(100);

ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
ExecutorProvider executorProvider = new ExecutorProvider(1, "client-test-stats");

PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testGetStats() throws Exception {
conf.setStatsIntervalSeconds(100);

ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);

PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void setup() throws PulsarClientException {

private void initializeEventLoopGroup(ClientConfigurationData conf) {
ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -179,7 +179,7 @@ public void testInitializeWithoutTimer() throws Exception {
@Test
public void testInitializeWithTimer() throws PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
conf.setServiceUrl("pulsar://localhost:6650");

Expand Down
5 changes: 5 additions & 0 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>cpu-affinity</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
Expand Down
Loading

0 comments on commit 1060043

Please sign in to comment.