Skip to content

Commit

Permalink
Replicated subscriptions - Create snapshots (apache#4354)
Browse files Browse the repository at this point in the history
* Replicated subscriptions - Create snapshots

* Added clock and unit tests for snapshot builder
  • Loading branch information
merlimat authored May 28, 2019
1 parent 46cffb9 commit 834ca6a
Show file tree
Hide file tree
Showing 10 changed files with 703 additions and 7 deletions.
9 changes: 9 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ maxMessageSize=5242880
# Interval between checks to see if topics with compaction policies need to be compacted
brokerServiceCompactionMonitorIntervalInSeconds=60

# Enable tracking of replicated subscriptions state across clusters.
enableReplicatedSubscriptions=true

# Frequency of snapshots for replicated subscriptions tracking.
replicatedSubscriptionsSnapshotFrequencyMillis=1000

# Timeout for building a consistent snapshot for tracking replicated subscriptions state.
replicatedSubscriptionsSnapshotTimeoutSeconds=30

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,21 @@ public class ServiceConfiguration implements PulsarConfiguration {
maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable tracking of replicated subscriptions state across clusters.")
private boolean enableReplicatedSubscriptions = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Frequency of snapshots for replicated subscriptions tracking.")
private int replicatedSubscriptionsSnapshotFrequencyMillis = 1_000;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Timeout for building a consistent snapshot for tracking replicated subscriptions state. ")
private int replicatedSubscriptionsSnapshotTimeoutSeconds = 30;

/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ default void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies)
default Optional<DispatchRateLimiter> getRateLimiter() {
return Optional.empty();
}

boolean isConnected();
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,10 @@ protected long getNumberOfEntriesInBacklog() {
protected void disableReplicatorRead() {
// No-op
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
return producer != null && producer.isConnected();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -50,15 +53,14 @@
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.api.Markers;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

public class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback {

private final PersistentTopic topic;
Expand Down Expand Up @@ -280,6 +282,8 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
continue;
}

checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);

if (msg.isReplicated()) {
// Discard messages that were already replicated into this region
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
Expand Down Expand Up @@ -650,5 +654,39 @@ public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
}
}

private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
if (!msg.getMessageBuilder().hasMarkerType()) {
// No marker is defined
return;
}

int markerType = msg.getMessageBuilder().getMarkerType();

if (!remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom())) {
// Only consider markers that are coming from the same cluster that this
// replicator instance is assigned to.
// All the replicators will see all the markers, but we need to only process
// it once.
return;
}

switch (markerType) {
case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE:
case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE:
case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE:
topic.receivedReplicatedSubscriptionMarker(position, markerType, payload);
break;

default:
// Do nothing
}
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
return producer != null && producer.isConnected();
}

private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
private volatile boolean schemaValidationEnforced = false;
private final StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);


private volatile Optional<ReplicatedSubscriptionsController> replicatedSubscriptionsController = Optional.empty();

private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
@Override
protected TopicStatsHelper initialValue() {
Expand Down Expand Up @@ -276,6 +279,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
}

checkReplicatedSubscriptionControllerState();
}

private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
Expand Down Expand Up @@ -597,6 +602,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
future.completeExceptionally(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
checkReplicatedSubscriptionControllerState();
log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
future.complete(consumer);
}
Expand Down Expand Up @@ -914,6 +920,11 @@ public void closeComplete(Object ctx) {
// Everything is now closed, remove the topic from map
brokerService.removeTopicFromCache(topic);

ReplicatedSubscriptionsController ctrl = replicatedSubscriptionsController.get();
if (ctrl != null) {
ctrl.close();
}

log.info("[{}] Topic closed", topic);
closeFuture.complete(null);
}
Expand Down Expand Up @@ -1418,7 +1429,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
topicStatsStream.writePair("storageSize", ledger.getEstimatedBacklogSize());
topicStatsStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl) ledger).getPendingAddEntriesCount());

nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
nsStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
Expand All @@ -1433,7 +1444,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

// Close topic object
topicStatsStream.endObject();

// add publish-latency metrics
this.addEntryLatencyStatsUsec.refresh();
NamespaceStats.copy(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
Expand Down Expand Up @@ -1488,7 +1499,7 @@ public TopicStats getStats() {

stats.storageSize = ledger.getEstimatedBacklogSize();
stats.deduplicationStatus = messageDeduplication.getStatus().toString();

return stats;
}

Expand Down Expand Up @@ -1992,8 +2003,49 @@ public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData sc
});
}


@Override
public void recordAddLatency(long latencyUSec) {
addEntryLatencyStatsUsec.addValue(latencyUSec);
}

private synchronized void checkReplicatedSubscriptionControllerState() {
AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
subscriptions.forEach((name, subscription) -> {
if (subscription.isReplicated()) {
shouldBeEnabled.set(true);
}
});

if (shouldBeEnabled.get() == false) {
log.info("[{}] There are no replicated subscriptions on the topic", topic);
}

checkReplicatedSubscriptionControllerState(shouldBeEnabled.get());
}

private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();

if (shouldBeEnabled && !isCurrentlyEnabled) {
log.info("[{}] Enabling replicated subscriptions controller", topic);
replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
brokerService.pulsar().getConfiguration().getClusterName()));
} else if (isCurrentlyEnabled && !shouldBeEnabled) {
log.info("[{}] Disabled replicated subscriptions controller", topic);
replicatedSubscriptionsController.get().close();
replicatedSubscriptionsController = Optional.empty();
}
}

void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
ReplicatedSubscriptionsController ctrl = replicatedSubscriptionsController.orElse(null);
if (ctrl == null) {
// Force to start the replication controller
checkReplicatedSubscriptionControllerState(true /* shouldBeEnabled */);
ctrl = replicatedSubscriptionsController.get();
}

ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);;
}
}
Loading

0 comments on commit 834ca6a

Please sign in to comment.