Skip to content

Commit

Permalink
[issues 5698]Handle replicator producer as generated name producer (a…
Browse files Browse the repository at this point in the history
…pache#5701)

Fixes apache#5698

Motivation
Since apache#5571 handle the generated producer name, the replicator producer was created by broker, only one producer for a replicated topic.

So, we can handle it simple by considered replicator producer as generated name producer.
  • Loading branch information
weishuisheng authored and jiazhai committed Nov 21, 2019
1 parent 2de6a0f commit 39f37c9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public abstract class AbstractTopic implements Topic {
protected volatile boolean isAllowAutoUpdateSchema = true;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

protected volatile PublishRateLimiter publishRateLimiter;

public AbstractTopic(String topic, BrokerService brokerService) {
Expand Down Expand Up @@ -297,8 +297,8 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept
private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
boolean canOverwrite = false;
if (oldProducer.equals(newProducer) && !oldProducer.isUserProvidedProducerName()
&& !newProducer.isUserProvidedProducerName() && newProducer.getEpoch() > oldProducer.getEpoch()) {
if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer)
&& !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) {
oldProducer.close(false);
canOverwrite = true;
}
Expand All @@ -316,6 +316,11 @@ private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
}
}

private boolean isUserProvidedProducerName(Producer producer){
//considered replicator producer as generated name producer
return producer.isUserProvidedProducerName() && !producer.getProducerName().startsWith(replicatorPrefix);
}

protected abstract void handleProducerRemoved(Producer producer);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public void testProducerOverwrite() throws Exception {
String role = "appid1";
Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, true);
Producer producer2= new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, true);
try {
topic.addProducer(producer1);
Expand All @@ -406,7 +406,7 @@ public void testProducerOverwrite() throws Exception {

Assert.assertEquals(topic.getProducers().size(), 1);

Producer producer3= new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
Producer producer3 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 1, false);

try {
Expand All @@ -421,7 +421,7 @@ public void testProducerOverwrite() throws Exception {
topic.removeProducer(producer1);
Assert.assertEquals(topic.getProducers().size(), 0);

Producer producer4= new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
Producer producer4 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 2, false);

topic.addProducer(producer3);
Expand All @@ -430,6 +430,30 @@ public void testProducerOverwrite() throws Exception {
Assert.assertEquals(topic.getProducers().size(), 1);

topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 2));

topic.removeProducer(producer4);
Assert.assertEquals(topic.getProducers().size(), 0);

Producer producer5 = new Producer(topic, serverCnx, 2 /* producer id */, "pulsar.repl.cluster1",
role, false, null, SchemaVersion.Latest, 1, false);

topic.addProducer(producer5);
Assert.assertEquals(topic.getProducers().size(), 1);

Producer producer6 = new Producer(topic, serverCnx, 2 /* producer id */, "pulsar.repl.cluster1",
role, false, null, SchemaVersion.Latest, 2, false);

topic.addProducer(producer6);
Assert.assertEquals(topic.getProducers().size(), 1);

topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 2));

Producer producer7 = new Producer(topic, serverCnx, 2 /* producer id */, "pulsar.repl.cluster1",
role, false, null, SchemaVersion.Latest, 3, true);

topic.addProducer(producer7);
Assert.assertEquals(topic.getProducers().size(), 1);
topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 3));
}

public void testMaxProducers() throws Exception {
Expand Down

0 comments on commit 39f37c9

Please sign in to comment.