Skip to content

Commit

Permalink
Refactored replicator test with backlog quota interactions (apache#773)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Sep 21, 2017
1 parent b96c50d commit 8bf8eb1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ public PulsarStats(PulsarService pulsar) {

@Override
public void close() {
ReferenceCountUtil.safeRelease(topicStatsBuf);
ReferenceCountUtil.safeRelease(tempTopicStatsBuf);
bufferLock.writeLock().lock();
try {
ReferenceCountUtil.safeRelease(topicStatsBuf);
ReferenceCountUtil.safeRelease(tempTopicStatsBuf);
} finally {
bufferLock.writeLock().unlock();
}
}

public ClusterReplicationMetrics getClusterReplicationMetrics() {
Expand Down Expand Up @@ -193,7 +198,7 @@ public void recordTopicLoadTimeValue(String topic, long topicLoadLatencyMs) {
log.warn("Exception while recording topic load time for topic {}, {}", topic, ex.getMessage());
}
}

public void recordZkLatencyTimeValue(EventType eventType, long latencyMs) {
try {
if (EventType.write.equals(eventType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,58 +690,58 @@ public void testReplicatorProducerClosing() throws Exception {
* @throws Exception
*/

@Test(enabled = true, priority = -1)
@Test(timeOut = 60000, enabled = true, priority = -1)
public void testResumptionAfterBacklogRelaxed() throws Exception {

List<RetentionPolicy> policies = Lists.newArrayList();
policies.add(RetentionPolicy.producer_exception);
policies.add(RetentionPolicy.producer_request_hold);

for (RetentionPolicy policy : policies) {
// Use 1Mb quota by default
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1 * 1024 * 1024, policy));
Thread.sleep(200);

DestinationName dest = DestinationName.get(String.format("persistent://pulsar/global/ns1/%s", policy));
DestinationName dest = DestinationName
.get(String.format("persistent://pulsar/global/ns1/%s-%d", policy, System.currentTimeMillis()));

// Producer on r1
MessageProducer producer1 = new MessageProducer(url1, dest);

// Consumer on r1
MessageConsumer consumer1 = new MessageConsumer(url1, dest);

// Consumer on r2
MessageConsumer consumer2 = new MessageConsumer(url2, dest);

// Replicator for r1 -> r2
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString());
Replicator replicator = topic.getPersistentReplicator("r2");

// Restrict backlog quota limit to 1
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1, policy));

// Produce a message to r1, then it will be replicated to r2 and fulfill the backlog.
// Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of local backlog
producer1.produce(1);
consumer1.receive(1);
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);

// Produce 9 messages to r1, then it will be pended because of the backlog limit excess
producer1.produce(9);
consumer1.receive(9);
Thread.sleep(1000L);
assertEquals(replicator.getStats().replicationBacklog, 9);
Thread.sleep(500);

// Restrict backlog quota limit to 1 byte to stop replication
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1, policy));

// Relax backlog quota limit to 1G
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1024 * 1024 * 1024, policy));
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);

// The messages should be replicated to r2
assertEquals(replicator.getStats().replicationBacklog, 0);

// Next message will not be replicated, because r2 has reached the quota
producer1.produce(1);

Thread.sleep(500);

assertEquals(replicator.getStats().replicationBacklog, 1);

// Consumer will now drain 1 message and the replication backlog will be cleared
consumer2.receive(1);
consumer2.receive(9);
if (!consumer2.drained()) {
throw new Exception("consumer2 - unexpected message in queue");
}

// Wait until the 2nd message got delivered to consumer
consumer2.receive(1);

assertEquals(replicator.getStats().replicationBacklog, 0);

producer1.close();
consumer1.close();
consumer2.close();
}
}
Expand Down

0 comments on commit 8bf8eb1

Please sign in to comment.