Skip to content

Commit

Permalink
Broker side deduplication (apache#751)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Sep 13, 2017
1 parent 817898e commit bdbb121
Show file tree
Hide file tree
Showing 17 changed files with 860 additions and 129 deletions.
23 changes: 21 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,25 @@ brokerDeleteInactiveTopicsFrequencySeconds=60
# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5

# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
brokerDeduplicationEnabled=false

# Maximum number of producer information that it's going to be
# persisted for deduplication purposes
brokerDeduplicationMaxNumberOfProducers=10000

# Number of entries after which a dedup info snapshot is taken.
# A bigger interval will lead to less snapshots being taken though it would
# increase the topic recovery time, when the entries published after the
# snapshot need to be replayed
brokerDeduplicationEntriesInterval=1000

# Time of inactivity after which the broker will discard the deduplication information
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down Expand Up @@ -103,11 +122,11 @@ maxUnackedMessagesPerBroker=0

# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0

# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling
Expand Down
50 changes: 35 additions & 15 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,33 @@ brokerDeleteInactiveTopicsEnabled=true
# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60

# How frequently to proactively check and purge expired messages
# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5

# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
brokerDeduplicationEnabled=false

# Maximum number of producer information that it's going to be
# persisted for deduplication purposes
brokerDeduplicationMaxNumberOfProducers=10000

# Number of entries after which a dedup info snapshot is taken.
# A bigger interval will lead to less snapshots being taken though it would
# increase the topic recovery time, when the entries published after the
# snapshot need to be replayed
brokerDeduplicationEntriesInterval=1000

# Time of inactivity after which the broker will discard the deduplication information
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360


# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

# Allow client libraries with no version information
# Allow client libraries with no version information
clientLibraryVersionCheckAllowUnversioned=true

# Path for the file used to determine the rotation status for the broker when responding
Expand All @@ -92,11 +112,11 @@ maxUnackedMessagesPerBroker=0

# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0

# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling
Expand Down Expand Up @@ -162,7 +182,7 @@ bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationParametersName=
bookkeeperClientAuthenticationParameters=

# Timeout for BK add / read operations
# Timeout for BK add / read operations
bookkeeperClientTimeoutInSeconds=30

# Speculative reads are initiated if a read request doesn't complete within a certain time
Expand All @@ -178,11 +198,11 @@ bookkeeperClientHealthCheckErrorThresholdPerInterval=5
bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800

# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when
# forming a new bookie ensemble
# forming a new bookie ensemble
bookkeeperClientRackawarePolicyEnabled=true

# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
# outside the specified groups will not be used by the broker
# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=

### --- Managed Ledger --- ###
Expand All @@ -198,7 +218,7 @@ managedLedgerDefaultAckQuorum=1

# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker
# running in the same broker
managedLedgerCacheSizeMB=1024

# Threshold to which bring down the cache level when eviction is triggered
Expand All @@ -210,7 +230,7 @@ managedLedgerDefaultMarkDeleteRateLimit=0.1
# Max number of entries to append to a ledger before triggering a rollover
# A ledger rollover is triggered on these conditions
# * Either the max rollover time has been reached
# * or max entries have been written to the ledged and at least min-time
# * or max entries have been written to the ledged and at least min-time
# has passed
managedLedgerMaxEntriesPerLedger=50000

Expand All @@ -228,7 +248,7 @@ managedLedgerCursorRolloverTimeInSeconds=14400



### --- Load balancer --- ###
### --- Load balancer --- ###

# Enable load balancer
loadBalancerEnabled=false
Expand All @@ -246,13 +266,13 @@ loadBalancerReportUpdateMaxIntervalMinutes=15
loadBalancerHostUsageCheckIntervalMinutes=1

# Load shedding interval. Broker periodically checks whether some traffic should be offload from
# some over-loaded broker to other under-loaded brokers
# some over-loaded broker to other under-loaded brokers
loadBalancerSheddingIntervalMinutes=30

# Prevent the same topics to be shed and moved to other broker more that once within this timeframe
# Prevent the same topics to be shed and moved to other broker more that once within this timeframe
loadBalancerSheddingGracePeriodMinutes=30

# Usage threshold to determine a broker as under-loaded
# Usage threshold to determine a broker as under-loaded
loadBalancerBrokerUnderloadedThresholdPercentage=1

# Usage threshold to determine a broker as over-loaded
Expand Down Expand Up @@ -289,7 +309,7 @@ replicationMetricsEnabled=true

# Max number of connections to open for each broker in a remote cluster
# More connections host-to-host lead to better throughput over high-latency
# links.
# links.
replicationConnectionsPerBroker=16

# Replicator producer queue size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ public class ServiceConfiguration implements PulsarConfiguration {
private long brokerDeleteInactiveTopicsFrequencySeconds = 60;
// How frequently to proactively check and purge expired messages
private int messageExpiryCheckIntervalInMinutes = 5;

// Set the default behavior for message deduplication in the broker
// This can be overridden per-namespace. If enabled, broker will reject
// messages that were already stored in the topic
private boolean brokerDeduplicationEnabled = false;

// Maximum number of producer information that it's going to be
// persisted for deduplication purposes
private int brokerDeduplicationMaxNumberOfProducers = 10000;

// Number of entries after which a dedup info snapshot is taken.
// A bigger interval will lead to less snapshots being taken though it would
// increase the topic recovery time, when the entries published after the
// snapshot need to be replayed
private int brokerDeduplicationEntriesInterval = 1000;

// Time of inactivity after which the broker will discard the deduplication information
// relative to a disconnected producer. Default is 6 hours.
private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360;

// Enable check for minimum allowed client library version
private boolean clientLibraryVersionCheckEnabled = false;
// Allow client libraries with no version information
Expand Down Expand Up @@ -105,7 +125,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
// limit/2 messages
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
// Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default
// message dispatch-throttling
// message dispatch-throttling
@FieldContext(dynamic = true)
private int dispatchThrottlingRatePerTopicInMsg = 0;
// Default number of message-bytes dispatching throttling-limit for every topic. Using a value of 0, is disabling
Expand All @@ -124,10 +144,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int maxConcurrentTopicLoadRequest = 5000;
// Max concurrent non-persistent message can be processed per connection
private int maxConcurrentNonPersistentMessagePerConnection = 1000;
// Number of worker threads to serve non-persistent topic
// Number of worker threads to serve non-persistent topic
private int numWorkerThreadsForNonPersistentTopic = 8;

// Enable broker to load persistent topics
private boolean enablePersistentTopics = true;

// Enable broker to load non-persistent topics
private boolean enableNonPersistentTopics = true;

Expand Down Expand Up @@ -437,6 +459,31 @@ public void setBrokerDeleteInactiveTopicsEnabled(boolean brokerDeleteInactiveTop
this.brokerDeleteInactiveTopicsEnabled = brokerDeleteInactiveTopicsEnabled;
}

public int getBrokerDeduplicationMaxNumberOfProducers() {
return brokerDeduplicationMaxNumberOfProducers;
}

public void setBrokerDeduplicationMaxNumberOfProducers(int brokerDeduplicationMaxNumberOfProducers) {
this.brokerDeduplicationMaxNumberOfProducers = brokerDeduplicationMaxNumberOfProducers;
}

public int getBrokerDeduplicationEntriesInterval() {
return brokerDeduplicationEntriesInterval;
}

public void setBrokerDeduplicationEntriesInterval(int brokerDeduplicationEntriesInterval) {
this.brokerDeduplicationEntriesInterval = brokerDeduplicationEntriesInterval;
}

public int getBrokerDeduplicationProducerInactivityTimeoutMinutes() {
return brokerDeduplicationProducerInactivityTimeoutMinutes;
}

public void setBrokerDeduplicationProducerInactivityTimeoutMinutes(
int brokerDeduplicationProducerInactivityTimeoutMinutes) {
this.brokerDeduplicationProducerInactivityTimeoutMinutes = brokerDeduplicationProducerInactivityTimeoutMinutes;
}

public long getBrokerDeleteInactiveTopicsFrequencySeconds() {
return brokerDeleteInactiveTopicsFrequencySeconds;
}
Expand All @@ -453,6 +500,14 @@ public void setMessageExpiryCheckIntervalInMinutes(int messageExpiryCheckInterva
this.messageExpiryCheckIntervalInMinutes = messageExpiryCheckIntervalInMinutes;
}

public boolean isBrokerDeduplicationEnabled() {
return brokerDeduplicationEnabled;
}

public void setBrokerDeduplicationEnabled(boolean brokerDeduplicationEnabled) {
this.brokerDeduplicationEnabled = brokerDeduplicationEnabled;
}

public boolean isClientLibraryVersionCheckEnabled() {
return clientLibraryVersionCheckEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,49 @@ public void setNamespaceMessageTTL(@PathParam("property") String property, @Path
}
}

@POST
@Path("/{property}/{cluster}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, boolean enableDeduplication) {
validateAdminAccessOnProperty(property);
validatePoliciesReadOnlyAccess();

NamespaceName nsName = new NamespaceName(property, cluster, namespace);
Entry<Policies, Stat> policiesNode = null;

try {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
policiesNode.getKey().deduplicationEnabled = enableDeduplication;

// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, property, cluster, namespace),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, property, cluster, namespace));

log.info("[{}] Successfully {} on namespace {}/{}/{}", clientAppId(),
enableDeduplication ? "enabled" : "disabled", property, cluster, namespace);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to modify deplication status for namespace {}/{}/{}: does not exist", clientAppId(),
property, cluster, namespace);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to modify deplication status on namespace {}/{}/{} expected policy node version={} : concurrent modification",
clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to modify deplication status on namespace {}/{}/{}", clientAppId(), property,
cluster, namespace, e);
throw new RestException(e);
}
}

@GET
@Path("/{property}/{cluster}/{namespace}/bundles")
@ApiOperation(value = "Get the bundles split data.")
Expand Down
Loading

0 comments on commit bdbb121

Please sign in to comment.