Skip to content

Commit

Permalink
Add maxUnackedMessagesPerSubscription and maxUnackedMessagesPerConsum…
Browse files Browse the repository at this point in the history
…er on namespaces policies (apache#5936)

* Add maxUnackedMessagesPerSubscription and maxUnackedMessagesPerConsumer on namespaces policies

Signed-off-by: xiaolong.ran <[email protected]>

* update admin cli docs

Signed-off-by: xiaolong.ran <[email protected]>

* add test case

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* add test case for max unacked messages

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix no space issue

Signed-off-by: xiaolong.ran <[email protected]>
  • Loading branch information
wolfstudy authored Feb 3, 2020
1 parent efee516 commit 4ea02a3
Show file tree
Hide file tree
Showing 17 changed files with 552 additions and 7 deletions.
18 changes: 17 additions & 1 deletion .github/workflows/ci-integration-cli.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,25 @@ jobs:
with:
java-version: 1.8

- name: run install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

- name: build artifacts and docker image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests

- name: clean docker container
if: steps.docs.outputs.changed_only == 'no'
run: docker system prune -f

- name: remove docker node image
if: steps.docs.outputs.changed_only == 'no'
run: docker rmi -f node:10 && docker rmi -f node:12 && docker rmi -f buildpack-deps:stretch

- name: remove docker builder and microsoft image
if: steps.docs.outputs.changed_only == 'no'
run: docker rmi -f jekyll/builder:latest && docker rmi -f mcr.microsoft.com/azure-pipelines/node8-typescript:latest

- name: run integration tests
if: steps.docs.outputs.changed_only == 'no'
Expand Down
18 changes: 17 additions & 1 deletion .github/workflows/ci-integration-function-state.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,25 @@ jobs:
with:
java-version: 1.8

- name: run install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

- name: build artifacts and docker image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests

- name: clean docker container
if: steps.docs.outputs.changed_only == 'no'
run: docker system prune -f

- name: remove docker node image
if: steps.docs.outputs.changed_only == 'no'
run: docker rmi -f node:10 && docker rmi -f node:12 && docker rmi -f buildpack-deps:stretch

- name: remove docker builder and microsoft image
if: steps.docs.outputs.changed_only == 'no'
run: docker rmi -f jekyll/builder:latest && docker rmi -f mcr.microsoft.com/azure-pipelines/node8-typescript:latest

- name: run integration tests
if: steps.docs.outputs.changed_only == 'no'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,14 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S
policies.max_consumers_per_subscription = config.getMaxConsumersPerSubscription();
}

if (policies.max_unacked_messages_per_consumer == -1) {
policies.max_unacked_messages_per_consumer = config.getMaxUnackedMessagesPerConsumer();
}

if (policies.max_unacked_messages_per_subscription == -1) {
policies.max_unacked_messages_per_subscription = config.getMaxUnackedMessagesPerSubscription();
}

final String cluster = config.getClusterName();
// attach default dispatch rate polices
if (policies.topicDispatchRate.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,88 @@ protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscri
}
}

protected int internalGetMaxUnackedMessagesPerConsumer() {
validateAdminAccessForTenant(namespaceName.getTenant());
return getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer;
}

protected void internalSetMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
if (maxUnackedMessagesPerConsumer < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedMessagesPerConsumer must be 0 or more");
}
policies.max_unacked_messages_per_consumer = maxUnackedMessagesPerConsumer;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated maxUnackedMessagesPerConsumer configuration: namespace={}, value={}", clientAppId(),
namespaceName, policies.max_unacked_messages_per_consumer);

} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}", clientAppId(), namespaceName,
e);
throw new RestException(e);
}
}

protected int internalGetMaxUnackedMessagesPerSubscription() {
validateAdminAccessForTenant(namespaceName.getTenant());
return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription;
}

protected void internalSetMaxUnackedMessagesPerSubscription(int maxUnackedMessagesPerSubscription) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
if (maxUnackedMessagesPerSubscription < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedMessagesPerSubscription must be 0 or more");
}
policies.max_unacked_messages_per_subscription = maxUnackedMessagesPerSubscription;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated maxUnackedMessagesPerSubscription configuration: namespace={}, value={}", clientAppId(),
namespaceName, policies.max_unacked_messages_per_subscription);

} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}", clientAppId(), namespaceName,
e);
throw new RestException(e);
}
}

protected long internalGetCompactionThreshold() {
validateAdminAccessForTenant(namespaceName.getTenant());
return getNamespacePolicies(namespaceName).compaction_threshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,54 @@ public void setMaxConsumersPerSubscription(@PathParam("tenant") String tenant, @
internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
}

@GET
@Path("/{tenant}/{namespace}/maxUnackedMessagesPerConsumer")
@ApiOperation(value = "Get maxUnackedMessagesPerConsumer config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public int getMaxUnackedMessagesPerConsumer(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetMaxUnackedMessagesPerConsumer();
}

@POST
@Path("/{tenant}/{namespace}/maxUnackedMessagesPerConsumer")
@ApiOperation(value = " Set maxConsumersPerTopic configuration on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "maxUnackedMessagesPerConsumer value is not valid") })
public void setMaxUnackedMessagesPerConsumer(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
int maxUnackedMessagesPerConsumer) {
validateNamespaceName(tenant, namespace);
internalSetMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
}

@GET
@Path("/{tenant}/{namespace}/maxUnackedMessagesPerSubscription")
@ApiOperation(value = "Get maxUnackedMessagesPerSubscription config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public int getMaxUnackedmessagesPerSubscription(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetMaxUnackedMessagesPerSubscription();
}

@POST
@Path("/{tenant}/{namespace}/maxUnackedMessagesPerSubscription")
@ApiOperation(value = " Set maxUnackedMessagesPerSubscription configuration on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "maxUnackedMessagesPerSubscription value is not valid") })
public void setMaxUnackedMessagesPerSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
int maxUnackedMessagesPerSubscription) {
validateNamespaceName(tenant, namespace);
internalSetMaxUnackedMessagesPerSubscription(maxUnackedMessagesPerSubscription);
}

@POST
@Path("/{tenant}/{namespace}/antiAffinity")
@ApiOperation(value = "Set anti-affinity group for a namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public abstract class AbstractTopic implements Topic {
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

protected volatile int maxUnackedMessagesOnConsumer = -1;

protected volatile PublishRateLimiter topicPublishRateLimiter;

private LongAdder bytesInCounter = new LongAdder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages");
private volatile int totalUnackedMessages = 0;
private final int maxUnackedMessages;
private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
Expand All @@ -116,8 +115,6 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
? new InMemoryRedeliveryTracker()
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
.getMaxUnackedMessagesPerSubscription();
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
}

Expand Down Expand Up @@ -329,7 +326,7 @@ public void readMoreEntries() {
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
totalUnackedMessages, maxUnackedMessages);
totalUnackedMessages, topic.maxUnackedMessagesOnSubscription);
} else if (!havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
Expand Down Expand Up @@ -650,10 +647,16 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List

@Override
public void addUnAckedMessages(int numberOfMessages) {
int maxUnackedMessages = topic.maxUnackedMessagesOnSubscription;
if (maxUnackedMessages == -1) {
maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
.getMaxUnackedMessagesPerSubscription();
}
// don't block dispatching if maxUnackedMessages = 0
if (maxUnackedMessages <= 0) {
return;
}

int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
if (unAckedMessages >= maxUnackedMessages
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, FALSE, TRUE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ protected TopicStatsHelper initialValue() {
private volatile double lastUpdatedAvgPublishRateInMsg = 0;
private volatile double lastUpdatedAvgPublishRateInByte = 0;

public volatile int maxUnackedMessagesOnSubscription = -1;

private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
Expand Down Expand Up @@ -252,6 +254,9 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;

schemaValidationEnforced = policies.schema_validation_enforced;

maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(policies);
maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(policies);
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
Expand Down Expand Up @@ -583,7 +588,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
: getNonDurableSubscription(subscriptionName, startMessageId, startMessageRollbackDurationSec);

int maxUnackedMessages = isDurable
? brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer()
? maxUnackedMessagesOnConsumer
: 0;

subscriptionFuture.thenAccept(subscription -> {
Expand Down Expand Up @@ -628,6 +633,22 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
return future;
}

private int unackedMessagesExceededOnSubscription(Policies data) {
final int maxUnackedMessages = data.max_unacked_messages_per_subscription > -1 ?
data.max_unacked_messages_per_subscription :
brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerSubscription();

return maxUnackedMessages;
}

private int unackedMessagesExceededOnConsumer(Policies data) {
final int maxUnackedMessages = data.max_unacked_messages_per_consumer > -1 ?
data.max_unacked_messages_per_consumer :
brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer();

return maxUnackedMessages;
}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -1742,6 +1763,9 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

schemaValidationEnforced = data.schema_validation_enforced;

maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(data);
maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(data);

if (data.delayed_delivery_policies != null) {
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
Expand Down
Loading

0 comments on commit 4ea02a3

Please sign in to comment.