Skip to content

Commit

Permalink
Fix apache#199: Do not stop to replicate when producer throws excepti…
Browse files Browse the repository at this point in the history
…on (apache#220)

* Fix apache#199: Do not stop to replicate when producer throws exception

* Fix log messages

* testReplicatorProducerClosing shoud be executed at last since it closes pulsar2/pulsar3

* Add unit test for replication resumption on backlog exceeded
  • Loading branch information
nkurihar authored and merlimat committed Feb 17, 2017
1 parent 7284ae8 commit 6fd212a
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,15 @@ public void sendComplete(Exception exception) {
if (exception != null) {
log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName,
replicator.localCluster, replicator.remoteCluster, exception);
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Message persisted on remote broker", replicator.topicName,
replicator.localCluster, replicator.remoteCluster);
// cursor shoud be rewinded since it was incremented when readMoreEntries
replicator.cursor.rewind();
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Message persisted on remote broker", replicator.topicName,
replicator.localCluster, replicator.remoteCluster);
}
replicator.cursor.asyncDelete(entry.getPosition(), replicator, entry.getPosition());
}

replicator.cursor.asyncDelete(entry.getPosition(), replicator, entry.getPosition());
entry.release();

int pending = PENDING_MESSAGES_UPDATER.decrementAndGet(replicator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.yahoo.pulsar.broker.service;

import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -56,11 +57,15 @@
import com.yahoo.pulsar.client.api.MessageBuilder;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.client.impl.ProducerImpl;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.BacklogQuota;
import com.yahoo.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.ReplicatorStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;

Expand Down Expand Up @@ -165,18 +170,18 @@ public Void call() throws Exception {

// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}

@Test
public void testConcurrentReplicator() throws Exception {

log.info("--- Starting ReplicatorTest::testConfigChange ---");
log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---");

final DestinationName dest = DestinationName.get(String.format("persistent://pulsar/global/ns1/topic-%d", 0));
ClientConfiguration conf = new ClientConfiguration();
conf.setStatsInterval(0, TimeUnit.SECONDS);
Producer producer = PulsarClient.create(url1.toString(), conf).createProducer(dest.toString());
producer.close();

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString()).get();

PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService().getReplicationClient("r3"));
Expand Down Expand Up @@ -344,7 +349,7 @@ public Void call() throws Exception {
@Test(enabled = false)
public void testReplicationOverrides() throws Exception {

log.info("--- Starting ReplicatorTest::testReplication ---");
log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");

// This test is to verify that the config change on global namespace is successfully applied in broker during
// runtime.
Expand Down Expand Up @@ -608,7 +613,7 @@ public Void call() throws Exception {
}
}
}

/**
* It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
* it should have cleaned up from the list
Expand Down Expand Up @@ -652,7 +657,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
});
}

@Test
@Test(priority = 5)
public void testReplicatorProducerClosing() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
Expand All @@ -671,6 +676,71 @@ public void testReplicatorProducerClosing() throws Exception {
assertNull(producer);
}

/**
* Issue #199
*
* It verifies that: if the remote cluster reaches backlog quota limit, replicator temporarily stops and once the
* backlog drains it should resume replication.
*
* @throws Exception
*/

@Test(enabled = true, priority = -1)
public void testResumptionAfterBacklogRelaxed() throws Exception {

List<RetentionPolicy> policies = Lists.newArrayList();
policies.add(RetentionPolicy.producer_exception);
policies.add(RetentionPolicy.producer_request_hold);

for (RetentionPolicy policy : policies) {

DestinationName dest = DestinationName.get(String.format("persistent://pulsar/global/ns1/%s", policy));

// Producer on r1
MessageProducer producer1 = new MessageProducer(url1, dest);

// Consumer on r1
MessageConsumer consumer1 = new MessageConsumer(url1, dest);

// Consumer on r2
MessageConsumer consumer2 = new MessageConsumer(url2, dest);

// Replicator for r1 -> r2
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString());
PersistentReplicator replicator = topic.getPersistentReplicator("r2");

// Restrict backlog quota limit to 1
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1, policy));

// Produce a message to r1, then it will be replicated to r2 and fulfill the backlog.
producer1.produce(1);
consumer1.receive(1);
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);

// Produce 9 messages to r1, then it will be pended because of the backlog limit excess
producer1.produce(9);
consumer1.receive(9);
Thread.sleep(1000L);
assertEquals(replicator.getStats().replicationBacklog, 9);

// Relax backlog quota limit to 1G
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1024 * 1024 * 1024, policy));
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);

// The messages should be replicated to r2
assertEquals(replicator.getStats().replicationBacklog, 0);
consumer2.receive(1);
consumer2.receive(9);
if (!consumer2.drained()) {
throw new Exception("consumer2 - unexpected message in queue");
}

producer1.close();
consumer1.close();
consumer2.close();
}
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class ReplicatorTestBase {

ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;

// Default frequency
public int getBrokerServicePurgeInactiveFrequency() {
return 60;
Expand Down Expand Up @@ -111,6 +113,7 @@ void setup() throws Exception {
config1.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config1.setBrokerServicePort(PortManager.nextFreePort());
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
pulsar1 = new PulsarService(config1);
pulsar1.start();
ns1 = pulsar1.getBrokerService();
Expand All @@ -135,6 +138,7 @@ void setup() throws Exception {
config2.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config2.setBrokerServicePort(PortManager.nextFreePort());
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
pulsar2 = new PulsarService(config2);
pulsar2.start();
ns2 = pulsar2.getBrokerService();
Expand Down

0 comments on commit 6fd212a

Please sign in to comment.