Skip to content

Commit

Permalink
[pulsar-broker] check replicator periodically to avoid issue due to z…
Browse files Browse the repository at this point in the history
…k missing watch (apache#6674)

### Motivation
We have a regular issue when user changes replication-cluster list, many times broker misses the zk watch and broker fails to update replicator which causes either data loss or message backlog based on zk-watch missing at source or destination replication broker.

### Modification
- Broker expires replication policies at every X seconds 
- Starts a background task which checks polices (fetches new policies if it's already expired) and starts/stops replicator if needed.

### Result
Broker can start/stop replicators based on updated policies even if broker misses zk watch.
  • Loading branch information
rdhabalia authored Apr 8, 2020
1 parent 772b789 commit 0f1673b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 0 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,10 @@ replicationProducerQueueSize=1000
# Replicator prefix used for replicator producer name and cursor name
replicatorPrefix=pulsar.repl

# Duration to check replication policy to avoid replicator inconsistency
# due to missing ZooKeeper watch (disable with value 0)
replicatioPolicyCheckDurationSeconds=600

# Default message retention time
defaultRetentionTimeInMinutes=0

Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ replicationConnectionsPerBroker=16
# Replicator producer queue size
replicationProducerQueueSize=1000

# Duration to check replication policy to avoid replicator inconsistency
# due to missing ZooKeeper watch (disable with value 0)
replicatioPolicyCheckDurationSeconds=600

# Default message retention time
defaultRetentionTimeInMinutes=0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Replicator producer queue size"
)
private int replicationProducerQueueSize = 1000;
@FieldContext(
category = CATEGORY_REPLICATION,
doc = "Duration to check replication policy to avoid replicator "
+ "inconsistency due to missing ZooKeeper watch (disable with value 0)"
)
private int replicatioPolicyCheckDurationSeconds = 600;
@Deprecated
@FieldContext(
category = CATEGORY_REPLICATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ public void start() throws Exception {
this.startMessagePublishBufferMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
this.startCheckReplicationPolicies();
// register listener to capture zk-latency
ClientCnxnAspect.addListener(zkStatsListener);
ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
Expand Down Expand Up @@ -446,6 +447,14 @@ protected void startMessageExpiryMonitor() {
TimeUnit.MINUTES);
}

protected void startCheckReplicationPolicies() {
int interval = pulsar.getConfig().getReplicatioPolicyCheckDurationSeconds();
if (interval > 0) {
messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkReplicationPolicies), interval, interval,
TimeUnit.SECONDS);
}
}

protected void startCompactionMonitor() {
int interval = pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
if (interval > 0) {
Expand Down Expand Up @@ -1143,6 +1152,10 @@ public void checkMessageExpiry() {
forEachTopic(Topic::checkMessageExpiry);
}

public void checkReplicationPolicies() {
forEachTopic(Topic::checkReplication);
}

public void checkCompaction() {
forEachTopic((t) -> {
if (t instanceof PersistentTopic) {
Expand Down

0 comments on commit 0f1673b

Please sign in to comment.