Skip to content

Commit

Permalink
[enh] Broker: make max size of Consumer metadata configurable (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored May 24, 2022
1 parent e120d97 commit 3dbf1f5
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " unacked messages than this percentage limit and subscription will not receive any new messages "
+ " until that subscription acks back `limit/2` messages")
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Maximum size of Consumer metadata")
private int maxConsumerMetadataSize = 1024;
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {

log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
try {
Metadata.validateMetadata(metadata);
Metadata.validateMetadata(metadata,
service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
consumers.remove(consumerId, consumerFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,24 @@
*/
public class Metadata {

private static final int MAX_METADATA_SIZE = 1024; // 1 Kb

private Metadata() {}

public static void validateMetadata(Map<String, String> metadata) throws IllegalArgumentException {
public static void validateMetadata(Map<String, String> metadata,
int maxConsumerMetadataSize) throws IllegalArgumentException {
if (metadata == null) {
return;
}

int size = 0;
for (Map.Entry<String, String> e : metadata.entrySet()) {
size += (e.getKey().length() + e.getValue().length());
if (size > MAX_METADATA_SIZE) {
throw new IllegalArgumentException(getErrorMessage());
if (size > maxConsumerMetadataSize) {
throw new IllegalArgumentException(getErrorMessage(maxConsumerMetadataSize));
}
}
}

private static String getErrorMessage() {
return "metadata has a max size of 1 Kb";
private static String getErrorMessage(int maxConsumerMetadataSize) {
return "metadata has a max size of " + maxConsumerMetadataSize + " bytes";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,46 +32,49 @@ public void testValidMetadata() {
Map<String, String> metadata = new HashMap<>();

metadata.put(generateKey(1, 512), generateKey(1, 512));
Assert.assertTrue(validateMetadata(metadata));
Assert.assertTrue(validateMetadata(metadata, 1024));

metadata.clear();
metadata.put(generateKey(1, 512), generateKey(1, 511));
Assert.assertTrue(validateMetadata(metadata));
Assert.assertTrue(validateMetadata(metadata, 1024));

metadata.clear();
metadata.put(generateKey(1, 256), generateKey(1, 256));
metadata.put(generateKey(2, 256), generateKey(2, 256));
Assert.assertTrue(validateMetadata(metadata));
Assert.assertTrue(validateMetadata(metadata, 1024));

metadata.clear();
metadata.put(generateKey(1, 256), generateKey(1, 256));
metadata.put(generateKey(2, 256), generateKey(2, 255));
Assert.assertTrue(validateMetadata(metadata));
Assert.assertTrue(validateMetadata(metadata, 1024));
}

@Test
public void testInvalidMetadata() {
Map<String, String> metadata = new HashMap<>();

metadata.put(generateKey(1, 512), generateKey(1, 513));
Assert.assertFalse(validateMetadata(metadata));
Assert.assertFalse(validateMetadata(metadata, 1024));

metadata.clear();
metadata.put(generateKey(1, 256), generateKey(1, 256));
metadata.put(generateKey(2, 256), generateKey(2, 257));
Assert.assertFalse(validateMetadata(metadata));
Assert.assertFalse(validateMetadata(metadata, 1024));


metadata.clear();
metadata.put(generateKey(1, 256), generateKey(1, 256));
metadata.put(generateKey(2, 256), generateKey(2, 256));
metadata.put(generateKey(3, 1), generateKey(3, 1));
Assert.assertFalse(validateMetadata(metadata));
Assert.assertFalse(validateMetadata(metadata, 1024));

// set bigger maxConsumerMetadataSize, now validation should pass
Assert.assertTrue(validateMetadata(metadata, 1024 * 10));
}

private static boolean validateMetadata(Map<String, String> metadata) {
private static boolean validateMetadata(Map<String, String> metadata, int maxSize) {
try {
Metadata.validateMetadata(metadata);
Metadata.validateMetadata(metadata, maxSize);
return true;
} catch (IllegalArgumentException ignore) {
return false;
Expand Down

0 comments on commit 3dbf1f5

Please sign in to comment.