Skip to content

Commit

Permalink
Configure static PulsarByteBufAllocator to handle OOM errors (apache#…
Browse files Browse the repository at this point in the history
…4196)

* Configure static PulsarByteBufAllocator to handle OOM errors

* Always specify `pulsar.allocator.exit_on_oom` when starting pulsar services

* Reverted metrics back

* Fixed compression tests

* Explicitely set the underlying allocator to netty default

* Fixed shading
  • Loading branch information
merlimat authored May 29, 2019
1 parent 9ba1259 commit 3b33c66
Show file tree
Hide file tree
Showing 37 changed files with 290 additions and 171 deletions.
2 changes: 1 addition & 1 deletion conf/pulsar_env.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}
PULSAR_GC=${PULSAR_GC:-"-XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB"}

# Extra options to be passed to the jvm
PULSAR_EXTRA_OPTS=${PULSAR_EXTRA_OPTS:-"-Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"}
PULSAR_EXTRA_OPTS=${PULSAR_EXTRA_OPTS:-" -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"}

# Add extra paths to the bookkeeper classpath
# PULSAR_EXTRA_CLASSPATH=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import com.google.common.collect.Sets;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
Expand Down Expand Up @@ -96,6 +94,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand Down Expand Up @@ -2173,7 +2172,7 @@ public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {

int msgMetadataSize = messageData.getSerializedSize();
int headersSize = 4 + msgMetadataSize;
ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
headers.writeInt(msgMetadataSize);
messageData.writeTo(outStream);
Expand Down
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ flexible messaging model and an intuitive client API.</description>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-common-allocator</artifactId>
<version>${bookkeeper.version}</version>
</dependency>

<!-- reflection libs -->
<dependency>
<groupId>org.reflections</groupId>
Expand Down Expand Up @@ -1120,8 +1126,9 @@ flexible messaging model and an intuitive client API.</description>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine> -Xmx2G -XX:MaxDirectMemorySize=8G
-Dio.netty.leakDetectionLevel=advanced
<argLine> -Xmx2G
-Dpulsar.allocator.pooled=false
-Dpulsar.allocator.leak_detection=Advanced
-Dlog4j.configurationFile=log4j2.xml
</argLine>
<reuseForks>false</reuseForks>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;


import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
Expand All @@ -50,8 +49,8 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.slf4j.Logger;
Expand Down Expand Up @@ -315,6 +314,11 @@ public static void main(String[] args) throws Exception {
})
);

PulsarByteBufAllocator.registerOOMListener(oomException -> {
log.error("-- Shutting down - Received OOM exception: {}", oomException.getMessage(), oomException);
starter.shutdown();
});

try {
starter.start();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
Expand All @@ -40,6 +39,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>();

@SuppressWarnings("deprecation")
@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
ClientConfiguration bkConf = new ClientConfiguration();
Expand All @@ -61,7 +61,6 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
bkConf.setStickyReadsEnabled(conf.isBookkeeperEnableStickyReads());
bkConf.setNettyMaxFrameSizeBytes(conf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);

bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
if (conf.isBookkeeperClientHealthCheckEnabled()) {
bkConf.enableBookieHealthCheck();
bkConf.setBookieHealthCheckInterval(conf.getBookkeeperHealthCheckIntervalSec(), TimeUnit.SECONDS);
Expand Down Expand Up @@ -109,7 +108,10 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
}

try {
return new BookKeeper(bkConf, zkClient);
return BookKeeper.forConfig(bkConf)
.allocator(PulsarByteBufAllocator.DEFAULT)
.zk(zkClient)
.build();
} catch (InterruptedException | BKException e) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.Sets;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -86,6 +85,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
Expand All @@ -101,8 +101,8 @@
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -1087,7 +1087,7 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());

// Copy into a heap buffer for output stream compatibility
ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand All @@ -42,7 +41,6 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -88,20 +86,19 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -278,7 +275,7 @@ public void start() throws Exception {
pulsar.getConfiguration().getClusterName());

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
bootstrap.group(acceptorGroup, workerGroup);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;

import java.io.IOException;
Expand All @@ -33,7 +32,7 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand Down Expand Up @@ -102,7 +101,7 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,

ByteBuf payload = msg.getHeadersAndPayload();
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
ByteBuf batchBuffer = PooledByteBufAllocator.DEFAULT.buffer(payload.capacity());
ByteBuf batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity());

CompressionType compressionType = metadata.getCompression();
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
*/
package org.apache.pulsar.client.impl;

import java.io.IOException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.io.IOException;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

public class RawMessageImpl implements RawMessage {
private static final Logger log = LoggerFactory.getLogger(RawMessageImpl.class);

Expand Down Expand Up @@ -74,7 +74,7 @@ public ByteBuf serialize() {
int headerSize = 4 /* IdSize */ + idSize + 4 /* PayloadAndMetadataSize */;
int totalSize = headerSize + headersAndPayload.readableBytes();

ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(totalSize);
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(totalSize);
buf.writeInt(idSize);
try {
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;

import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -39,15 +42,12 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;

/**
*/
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
Expand All @@ -64,7 +64,7 @@ public static byte[] createMessageWrittenToLedger(String msg) throws Exception {
int payloadSize = data.readableBytes();
int totalSize = 4 + msgMetadataSize + payloadSize;

ByteBuf headers = PooledByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
ByteBuf headers = PulsarByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
headers.writeInt(msgMetadataSize);
messageMetadata.writeTo(outStream);
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-admin-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
<include>io.opencensus:*</include>
<include>org.objenesis:*</include>
<include>org.yaml:snakeyaml</include>
<include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
</includes>
</artifactSet>
<filters>
Expand Down Expand Up @@ -219,6 +220,10 @@
<pattern>org.yaml</pattern>
<shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.bookkeeper</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
<include>org.xerial.snappy:snappy-java</include>
<include>org.apache.commons:commons-compress</include>
<include>org.tukaani:xz</include>
<include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
</includes>
</artifactSet>
<filters>
Expand All @@ -172,6 +173,10 @@
<pattern>org.asynchttpclient</pattern>
<shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.bookkeeper</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
<include>org.apache.pulsar:pulsar-client-original</include>
<include>org.apache.commons:commons-lang3</include>
<include>commons-codec:commons-codec</include>
<include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
<include>commons-collections:commons-collections</include>
<include>org.asynchttpclient:*</include>
<include>io.netty:netty-codec-http</include>
Expand Down Expand Up @@ -228,6 +229,10 @@
<pattern>org.tukaani</pattern>
<shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.bookkeeper</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
<artifactSet>
<includes>
<include>org.apache.pulsar:pulsar-client-original</include>
<include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
<include>org.apache.commons:commons-lang3</include>
<include>commons-codec:commons-codec</include>
<include>commons-collections:commons-collections</include>
Expand Down Expand Up @@ -249,6 +250,10 @@
<pattern>org.tukaani</pattern>
<shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.bookkeeper</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
Expand Down
Loading

0 comments on commit 3b33c66

Please sign in to comment.