Skip to content

Commit

Permalink
Fix hash range conflict issue in Key_Shared with sticky hash range (a…
Browse files Browse the repository at this point in the history
…pache#7231)

Fix hash range conflict issue in Key_Shared with sticky hash range
  • Loading branch information
codelipenghui authored Jun 11, 2020
1 parent 941ddd6 commit e1c04ef
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,14 @@ private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceExcept
}

if (ceilingEntry != null && floorEntry != null && ceilingEntry.getValue().equals(floorEntry.getValue())) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue());
PulsarApi.KeySharedMeta keySharedMeta = ceilingEntry.getValue().getKeySharedMeta();
for (PulsarApi.IntRange range : keySharedMeta.getHashRangesList()) {
int start = Math.max(intRange.getStart(), range.getStart());
int end = Math.min(intRange.getEnd(), range.getEnd());
if (end >= start) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue());
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,70 @@ public void testRemoveFirstConsumer() throws Exception {
}
}

@Test
public void testHashRangeConflict() throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
final String topic = "testHashRangeConflict-" + UUID.randomUUID().toString();
final String sub = "test";

Consumer<String> consumer1 = createFixedHashRangesConsumer(topic, sub, Range.of(0,99), Range.of(400, 65535));
Assert.assertTrue(consumer1.isConnected());

Consumer<String> consumer2 = createFixedHashRangesConsumer(topic, sub, Range.of(100,399));
Assert.assertTrue(consumer2.isConnected());

try {
createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
Assert.fail("Should failed with conflict range.");
} catch (PulsarClientException.ConsumerAssignException ignore) {
}

try {
createFixedHashRangesConsumer(topic, sub, Range.of(1,1));
Assert.fail("Should failed with conflict range.");
} catch (PulsarClientException.ConsumerAssignException ignore) {
}

consumer1.close();

try {
createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
Assert.fail("Should failed with conflict range.");
} catch (PulsarClientException.ConsumerAssignException ignore) {
}

try {
createFixedHashRangesConsumer(topic, sub, Range.of(50,100));
Assert.fail("Should failed with conflict range.");
} catch (PulsarClientException.ConsumerAssignException ignore) {
}

try {
createFixedHashRangesConsumer(topic, sub, Range.of(399,500));
Assert.fail("Should failed with conflict range.");
} catch (PulsarClientException.ConsumerAssignException ignore) {
}

Consumer<String> consumer3 = createFixedHashRangesConsumer(topic, sub, Range.of(400,600));
Assert.assertTrue(consumer3.isConnected());

Consumer<String> consumer4 = createFixedHashRangesConsumer(topic, sub, Range.of(50,99));
Assert.assertTrue(consumer4.isConnected());

consumer2.close();
consumer3.close();
consumer4.close();
}

private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
return pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subscription)
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(ranges))
.subscribe();
}

private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
Producer<Integer> producer = null;
if (enableBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,20 @@ public CryptoException(String msg) {
}
}

/**
* Consumer assign exception thrown by Pulsar client.
*/
public static class ConsumerAssignException extends PulsarClientException {

/**
* Constructs an {@code ConsumerAssignException} with the specified detail message.
* @param msg The detail message.
*/
public ConsumerAssignException(String msg) {
super(msg);
}
}

// wrap an exception to enriching more info messages.
public static Throwable wrap(Throwable t, String msg) {
msg += "\n" + t.getMessage();
Expand Down Expand Up @@ -786,6 +800,8 @@ public static Throwable wrap(Throwable t, String msg) {
return new ChecksumException(msg);
} else if (t instanceof CryptoException) {
return new CryptoException(msg);
} else if (t instanceof ConsumerAssignException) {
return new ConsumerAssignException(msg);
} else if (t instanceof PulsarClientException) {
return new PulsarClientException(msg);
} else if (t instanceof CompletionException) {
Expand Down Expand Up @@ -867,6 +883,8 @@ public static PulsarClientException unwrap(Throwable t) {
return new ChecksumException(msg);
} else if (cause instanceof CryptoException) {
return new CryptoException(msg);
} else if (cause instanceof ConsumerAssignException) {
return new ConsumerAssignException(msg);
} else if (cause instanceof TopicDoesNotExistException) {
return new TopicDoesNotExistException(msg);
} else {
Expand Down Expand Up @@ -895,6 +913,7 @@ public static boolean isRetriableError(Throwable t) {
|| t instanceof NotSupportedException
|| t instanceof ChecksumException
|| t instanceof CryptoException
|| t instanceof ConsumerAssignException
|| t instanceof ProducerBusyException
|| t instanceof ConsumerBusyException) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,8 @@ private PulsarClientException getPulsarClientException(ServerError error, String
return new PulsarClientException.IncompatibleSchemaException(errorMsg);
case TopicNotFound:
return new PulsarClientException.TopicDoesNotExistException(errorMsg);
case ConsumerAssignError:
return new PulsarClientException.ConsumerAssignException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
Expand Down

0 comments on commit e1c04ef

Please sign in to comment.