Skip to content

Commit

Permalink
Catch exceptions in scheduled tasks to prevent unintended cancellation (
Browse files Browse the repository at this point in the history
apache#12853)

* Catch exceptions in scheduled tasks to prevent unintended cancellation

- ScheduledExecutorService#scheduleAtFixedRate won't schedule the next execution if running the task
  throws an exception. This can lead to unintended cancellation of the scheduled task in failure scenarios.

* Address review feedback: use private constructor

* Address review feedback: Use private inner class instead of Lambda
  • Loading branch information
lhotari authored Nov 19, 2021
1 parent aa371fb commit afdfe19
Show file tree
Hide file tree
Showing 27 changed files with 218 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -187,9 +188,9 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats,
this.statsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::refreshStats),
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(this::flushCursors,
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.sun.management.OperatingSystemMXBean;
import java.lang.management.ManagementFactory;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -53,9 +54,10 @@ public GenericBrokerHostUsageImpl(int hostUsageCheckIntervalMin,
this.totalCpuLimit = getTotalCpuLimit();
// Call now to initialize values before the constructor returns
calculateBrokerHostUsage();
executorService.scheduleAtFixedRate(this::checkCpuLoad, CPU_CHECK_MILLIS,
executorService.scheduleAtFixedRate(catchingAndLoggingThrowables(this::checkCpuLoad), CPU_CHECK_MILLIS,
CPU_CHECK_MILLIS, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(this::doCalculateBrokerHostUsage, hostUsageCheckIntervalMin,
executorService.scheduleAtFixedRate(catchingAndLoggingThrowables(this::doCalculateBrokerHostUsage),
hostUsageCheckIntervalMin,
hostUsageCheckIntervalMin, TimeUnit.MINUTES);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.Charsets;
import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
Expand Down Expand Up @@ -86,7 +87,8 @@ public LinuxBrokerHostUsageImpl(int hostUsageCheckIntervalMin,

// Call now to initialize values before the constructor returns
calculateBrokerHostUsage();
executorService.scheduleAtFixedRate(this::calculateBrokerHostUsage, hostUsageCheckIntervalMin,
executorService.scheduleAtFixedRate(catchingAndLoggingThrowables(this::calculateBrokerHostUsage),
hostUsageCheckIntervalMin,
hostUsageCheckIntervalMin, TimeUnit.MINUTES);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.util.Map;
Expand Down Expand Up @@ -574,7 +575,7 @@ protected void aggregateResourceGroupLocalUsages() {
cancelStatus, this.aggregateLocalUsagePeriodInSeconds, newPeriodInSeconds, timeUnitScale);
}
this.aggreagteLocalUsagePeriodicTask = pulsar.getExecutor().scheduleAtFixedRate(
this::aggregateResourceGroupLocalUsages,
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
newPeriodInSeconds,
newPeriodInSeconds,
timeUnitScale);
Expand Down Expand Up @@ -664,7 +665,7 @@ protected void calculateQuotaForAllResourceGroups() {
cancelStatus, this.resourceUsagePublishPeriodInSeconds, newPeriodInSeconds, timeUnitScale);
}
this.calculateQuotaPeriodicTask = pulsar.getExecutor().scheduleAtFixedRate(
this::calculateQuotaForAllResourceGroups,
catchingAndLoggingThrowables(this::calculateQuotaForAllResourceGroups),
newPeriodInSeconds,
newPeriodInSeconds,
timeUnitScale);
Expand All @@ -679,12 +680,12 @@ private void initialize() {
long periodInSecs = config.getResourceUsageTransportPublishIntervalInSecs();
this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs;
this.aggreagteLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(
this::aggregateResourceGroupLocalUsages,
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
periodInSecs,
periodInSecs,
this.timeUnitScale);
this.calculateQuotaPeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(
this::calculateQuotaForAllResourceGroups,
catchingAndLoggingThrowables(this::calculateQuotaForAllResourceGroups),
periodInSecs,
periodInSecs,
this.timeUnitScale);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.client.api.CompressionType.LZ4;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -79,7 +80,7 @@ private Producer<ByteBuffer> createProducer() throws PulsarClientException {
public ResourceUsageWriterTask() throws PulsarClientException {
producer = createProducer();
resourceUsagePublishTask = pulsarService.getExecutor().scheduleAtFixedRate(
this,
catchingAndLoggingThrowables(this),
pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(),
pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(),
TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
log.info("Enabling per-broker unack-message limit {} and dispatcher-limit {} on blocked-broker",
maxUnackedMessages, maxUnackedMsgsPerDispatcher);
// block misbehaving dispatcher by checking periodically
pulsar.getExecutor().scheduleAtFixedRate(() -> checkUnAckMessageDispatching(),
pulsar.getExecutor().scheduleAtFixedRate(safeRun(this::checkUnAckMessageDispatching),
600, 30, TimeUnit.SECONDS);
} else {
this.maxUnackedMessages = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.Gauge;
import java.io.IOException;
Expand Down Expand Up @@ -78,7 +79,7 @@ public ReplicatedSubscriptionsController(PersistentTopic topic, String localClus
this.topic = topic;
this.localCluster = localCluster;
timer = topic.getBrokerService().pulsar().getExecutor()
.scheduleAtFixedRate(this::startNewSnapshot, 0,
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::startNewSnapshot), 0,
topic.getBrokerService().pulsar().getConfiguration()
.getReplicatedSubscriptionsSnapshotFrequencyMillis(),
TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;


import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.MoreObjects;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -271,7 +272,7 @@ public void close() {
}

private ScheduledFuture<?> createTask() {
return executorService.scheduleAtFixedRate(this::closeAndClearRateLimiters,
return executorService.scheduleAtFixedRate(catchingAndLoggingThrowables(this::closeAndClearRateLimiters),
this.subscribeRate.ratePeriodInSecond,
this.subscribeRate.ratePeriodInSecond,
TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Collector;
Expand Down Expand Up @@ -95,9 +96,8 @@ public void start(Configuration conf) {
DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);

executor.scheduleAtFixedRate(() -> {
rotateLatencyCollection();
}, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
1, latencyRolloverSeconds, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;

import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -215,8 +216,9 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.localAddress = ctx.channel().localAddress();
this.remoteAddress = ctx.channel().remoteAddress();

this.timeoutTask = this.eventLoopGroup.scheduleAtFixedRate(() -> checkRequestTimeout(), operationTimeoutMs,
operationTimeoutMs, TimeUnit.MILLISECONDS);
this.timeoutTask = this.eventLoopGroup
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::checkRequestTimeout), operationTimeoutMs,
operationTimeoutMs, TimeUnit.MILLISECONDS);

if (proxyToTargetBrokerAddress == null) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
Expand Down Expand Up @@ -1252,10 +1253,10 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m

// Lazy task scheduling to expire incomplete chunk message
if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
internalPinnedExecutor.scheduleAtFixedRate(() -> {
removeExpireIncompleteChunkedMessages();
}, expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
TimeUnit.MILLISECONDS);
internalPinnedExecutor
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
TimeUnit.MILLISECONDS);
expireChunkMessageTaskScheduled = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
Expand Down Expand Up @@ -102,7 +103,7 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
this.currentCumulativeAckFuture = new TimedCompletableFuture<>();

if (acknowledgementGroupTimeMicros > 0) {
scheduledTask = eventLoopGroup.next().scheduleWithFixedDelay(this::flush, acknowledgementGroupTimeMicros,
scheduledTask = eventLoopGroup.next().scheduleWithFixedDelay(catchingAndLoggingThrowables(this::flush), acknowledgementGroupTimeMicros,
acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS);
} else {
scheduledTask = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.pulsar.client.impl.ProducerBase.MultiSchemaMode.Enabled;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
Expand Down Expand Up @@ -205,7 +206,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration

if (this.msgCrypto != null) {
// Regenerate data key cipher at fixed interval
keyGeneratorTask = client.eventLoopGroup().scheduleWithFixedDelay(() -> {
keyGeneratorTask = client.eventLoopGroup().scheduleWithFixedDelay(catchingAndLoggingThrowables(() -> {
try {
msgCrypto.addPublicKeyCipher(conf.getEncryptionKeys(), conf.getCryptoKeyReader());
} catch (CryptoException e) {
Expand All @@ -218,7 +219,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
producerName, topic)));
}
}
}, 0L, 4L, TimeUnit.HOURS);
}), 0L, 4L, TimeUnit.HOURS);
}

if (conf.getSendTimeoutMs() > 0) {
Expand Down Expand Up @@ -1443,24 +1444,26 @@ public void connectionOpened(final ClientCnx cnx) {

if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
// schedule the first batch message task
batchTimerTask = cnx.ctx().executor().scheduleAtFixedRate(() -> {
if (log.isTraceEnabled()) {
log.trace(
"[{}] [{}] Batching the messages from the batch container from timer thread",
topic,
producerName);
}
// semaphore acquired when message was enqueued to container
synchronized (ProducerImpl.this) {
// If it's closing/closed we need to ignore the send batch timer and not
// schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
return;
}

batchMessageAndSend();
}
}, 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
batchTimerTask = cnx.ctx().executor()
.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
if (log.isTraceEnabled()) {
log.trace(
"[{}] [{}] Batching the messages from the batch container from "
+ "timer thread",
topic,
producerName);
}
// semaphore acquired when message was enqueued to container
synchronized (ProducerImpl.this) {
// If it's closing/closed we need to ignore the send batch timer and not
// schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
return;
}

batchMessageAndSend();
}
}), 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
resendMessages(cnx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.protocol;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
Expand Down Expand Up @@ -65,8 +66,9 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("[{}] Scheduling keep-alive task every {} s", ctx.channel(), keepAliveIntervalSeconds);
}
if (keepAliveIntervalSeconds > 0) {
this.keepAliveTask = ctx.executor().scheduleAtFixedRate(this::handleKeepAliveTimeout,
keepAliveIntervalSeconds, keepAliveIntervalSeconds, TimeUnit.SECONDS);
this.keepAliveTask = ctx.executor()
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::handleKeepAliveTimeout),
keepAliveIntervalSeconds, keepAliveIntervalSeconds, TimeUnit.SECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.stats;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.PoolArenaMetric;
Expand Down Expand Up @@ -96,7 +97,7 @@ private static String detectGCType() {
public JvmMetrics(ScheduledExecutorService executor, String componentName, JvmGCMetricsLogger gcLogger) {
this.gcLogger = gcLogger;
if (executor != null) {
executor.scheduleAtFixedRate(gcLogger::refresh, 0, 1, TimeUnit.MINUTES);
executor.scheduleAtFixedRate(catchingAndLoggingThrowables(gcLogger::refresh), 0, 1, TimeUnit.MINUTES);
}
this.componentName = componentName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.common.util;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.MoreObjects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -256,7 +257,8 @@ public synchronized TimeUnit getRateTimeUnit() {
}

protected ScheduledFuture<?> createTask() {
return executorService.scheduleAtFixedRate(this::renew, this.rateTime, this.rateTime, this.timeUnit);
return executorService.scheduleAtFixedRate(catchingAndLoggingThrowables(this::renew), this.rateTime,
this.rateTime, this.timeUnit);
}

synchronized void renew() {
Expand Down
Loading

0 comments on commit afdfe19

Please sign in to comment.