Skip to content

Commit

Permalink
[pulsar-broker] add flag to skip broker shutdown on transient OOM (ap…
Browse files Browse the repository at this point in the history
…ache#6634)

### Motivation

Some time due to high dispatch rate on one of the topic can temporarily cause broker to go OOM and it will be transient error and broker can recover within a few seconds as soon as some memory gets released. However, 2.4 release has change apache#4196 which restarts broker on OOM which can cause huge instability in cluster where that topic moves from one broker to another and restarts multiple brokers and cause disruption for other topics as well. we have seen similar kind of issue mentioned at  apache#5896 . This could be transient error and we need a way to ignore this error. So, we need a dynamic flag to skip broker shutdown on OOM to avoid instability in a cluster.
```
01:48:49.549 [pulsar-io-22-37] ERROR org.apache.pulsar.PulsarBrokerStarter - -- Shutting down - Received OOM exception: Direct buffer memory
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265) ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-all-4.1.32.Final.jar:4.1.32.
:
: 
01:48:49.549 [pulsar-io-22-39] ERROR org.apache.pulsar.PulsarBrokerStarter - -- Shutting down - Received OOM exception: Direct buffer memory
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) [bookkeeper-common-allocator-4.9.4.2-ya
hoo.jar:4.9.4.2-yahoo]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) [bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) [netty-all-4.1.32.Final.jar:4.1.32.Final]
```

### Modification
Add dynamic flag to avoid broker shutdown on OOM.
  • Loading branch information
rdhabalia authored Mar 30, 2020
1 parent 7bd990a commit 2ea2798
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 2 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ zooKeeperOperationTimeoutSeconds=30
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

# Flag to skip broker shutdown when broker handles Out of memory error
skipBrokerShutdownOnOOM=false

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ zooKeeperOperationTimeoutSeconds=30
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

# Flag to skip broker shutdown when broker handles Out of memory error
skipBrokerShutdownOnOOM=false

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
3 changes: 3 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ zooKeeperSessionTimeoutMillis=30000
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

# Flag to skip broker shutdown when broker handles Out of memory error
skipBrokerShutdownOnOOM=false

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed"
)
private long brokerShutdownTimeoutMs = 60000;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Flag to skip broker shutdown when broker handles Out of memory error"
)
private boolean skipBrokerShutdownOnOOM = false;

@FieldContext(
category = CATEGORY_POLICIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,12 @@ public static void main(String[] args) throws Exception {
);

PulsarByteBufAllocator.registerOOMListener(oomException -> {
log.error("-- Shutting down - Received OOM exception: {}", oomException.getMessage(), oomException);
starter.shutdown();
if (starter.brokerConfig.isSkipBrokerShutdownOnOOM()) {
log.error("-- Received OOM exception: {}", oomException.getMessage(), oomException);
} else {
log.error("-- Shutting down - Received OOM exception: {}", oomException.getMessage(), oomException);
starter.shutdown();
}
});

try {
Expand Down
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|dispatchThrottlingRatePerReplicatorInByte| The default bytes per second dispatch throttling-limit for every replicator in replication. The value of `0` means disabling replication message-byte dispatch-throttling| 0 |
|zooKeeperSessionTimeoutMillis| Zookeeper session timeout in milliseconds |30000|
|brokerShutdownTimeoutMs| Time to wait for broker graceful shutdown. After this time elapses, the process will be killed |60000|
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
|backlogQuotaCheckEnabled| Enable backlog quota check. Enforces action on topic when the quota is reached |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the quota |60|
|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit | -1 |
Expand Down Expand Up @@ -335,6 +336,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|clusterName| The name of the cluster that this broker belongs to. |standalone|
|zooKeeperSessionTimeoutMillis| The ZooKeeper session timeout, in milliseconds. |30000|
|brokerShutdownTimeoutMs| The time to wait for graceful broker shutdown. After this time elapses, the process will be killed. |60000|
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
|backlogQuotaCheckEnabled| Enable the backlog quota check, which enforces a specified action when the quota is reached. |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the backlog quota. |60|
|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit. |10|
Expand Down

0 comments on commit 2ea2798

Please sign in to comment.