Skip to content

Commit

Permalink
[fix][broker] Fix out of order data replication (apache#17154)
Browse files Browse the repository at this point in the history
* [fix][broker] Fix out of order data replication

### Motivation

The schema replication will break the data replication order while fetching
the schema from the local cluster.

https://github.com/apache/pulsar/blob/8a6ecd7d4c9399bb7ce5a224ca854e4a71db79b1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L366-L369

The method getSchemaInfo() is an async method that will reverse the order in which messages are written.

### Modification

Added a new state for replicator `fetchSchemaInProgress` which means the
replicator had detected a new schema that needed to fetch the schema info
from the local cluster. During the schema fetching, the replicator should
pause the data replicator and resume after the schema has been fetched.
  • Loading branch information
codelipenghui authored Aug 19, 2022
1 parent f6fd6b7 commit 39c1ee1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public class PersistentReplicator extends AbstractReplicator

private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();

private volatile boolean fetchSchemaInProgress = false;

public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
Expand Down Expand Up @@ -220,6 +222,11 @@ private int getAvailablePermits() {
}

protected void readMoreEntries() {
if (fetchSchemaInProgress) {
log.info("[{}][{} -> {}] Skip the reading due to new detected schema",
topicName, localCluster, remoteCluster);
return;
}
int availablePermits = getAvailablePermits();

if (availablePermits > 0) {
Expand Down Expand Up @@ -290,8 +297,15 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
// This flag is set to true when we skip atleast one local message,
// in order to skip remaining local messages.
boolean isLocalMessageSkippedOnce = false;
boolean skipRemainingMessages = false;
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
// Skip the messages since the replicator need to fetch the schema info to replicate the schema to the
// remote cluster. Rewind the cursor first and continue the message read after fetched the schema.
if (skipRemainingMessages) {
entry.release();
continue;
}
int length = entry.getLength();
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
Expand Down Expand Up @@ -364,16 +378,34 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {

headersAndPayload.retain();

getSchemaInfo(msg).thenAccept(schemaInfo -> {
msg.setSchemaInfoForReplicator(schemaInfo);
CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
entry.release();
headersAndPayload.release();
msg.recycle();
// Mark the replicator is fetching the schema for now and rewind the cursor
// and trigger the next read after complete the schema fetching.
fetchSchemaInProgress = true;
skipRemainingMessages = true;
cursor.cancelPendingReadRequest();
log.info("[{}][{} -> {}] Pause the data replication due to new detected schema", topicName,
localCluster, remoteCluster);
schemaFuture.whenComplete((__, e) -> {
if (e != null) {
log.warn("[{}][{} -> {}] Failed to get schema from local cluster, will try in the next loop",
topicName, localCluster, remoteCluster, e);
}
log.info("[{}][{} -> {}] Resume the data replication after the schema fetching done", topicName,
localCluster, remoteCluster);
cursor.rewind();
fetchSchemaInProgress = false;
readMoreEntries();
});
} else {
msg.setSchemaInfoForReplicator(schemaFuture.get());
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
}).exceptionally(ex -> {
log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
localCluster, remoteCluster, ex);
return null;
});

atLeastOneMessageSentForReplication = true;
atLeastOneMessageSentForReplication = true;
}
}
} catch (Exception e) {
log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, localCluster, remoteCluster, e.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand Down Expand Up @@ -384,8 +385,11 @@ public void testReplication(String namespace) throws Exception {
consumer3.receive(1);
}

@Test
@Test(invocationCount = 5)
public void testReplicationWithSchema() throws Exception {
config1.setBrokerDeduplicationEnabled(true);
config2.setBrokerDeduplicationEnabled(true);
config3.setBrokerDeduplicationEnabled(true);
PulsarClient client1 = pulsar1.getClient();
PulsarClient client2 = pulsar2.getClient();
PulsarClient client3 = pulsar3.getClient();
Expand All @@ -395,17 +399,29 @@ public void testReplicationWithSchema() throws Exception {
final String subName = "my-sub";

@Cleanup
Producer<Schemas.PersonOne> producer = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.enableBatching(false)
.create();

@Cleanup
Producer<Schemas.PersonThree> producer2 = client1.newProducer(Schema.AVRO(Schemas.PersonThree.class))
.topic(topic.toString())
.enableBatching(false)
.create();

admin1.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
admin2.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
admin3.topics().createSubscription(topic.toString(), subName, MessageId.earliest);

final int totalMessages = 1000;

for (int i = 0; i < 10; i++) {
producer.send(new Schemas.PersonOne(i));
for (int i = 0; i < totalMessages / 2; i++) {
producer1.sendAsync(new Schemas.PersonOne(i));
}

for (int i = 500; i < totalMessages; i++) {
producer2.sendAsync(new Schemas.PersonThree(i, "name-" + i));
}

Awaitility.await().untilAsserted(() -> {
Expand All @@ -415,29 +431,39 @@ public void testReplicationWithSchema() throws Exception {
});

@Cleanup
Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
Consumer<GenericRecord> consumer1 = client1.newConsumer(Schema.AUTO_CONSUME())
.topic(topic.toString())
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<Schemas.PersonOne> consumer2 = client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
Consumer<GenericRecord> consumer2 = client2.newConsumer(Schema.AUTO_CONSUME())
.topic(topic.toString())
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<Schemas.PersonOne> consumer3 = client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
Consumer<GenericRecord> consumer3 = client3.newConsumer(Schema.AUTO_CONSUME())
.topic(topic.toString())
.subscriptionName(subName)
.subscribe();

for (int i = 0; i < 10; i++) {
Message<Schemas.PersonOne> msg1 = consumer1.receive();
Message<Schemas.PersonOne> msg2 = consumer2.receive();
Message<Schemas.PersonOne> msg3 = consumer3.receive();
int lastId = -1;
for (int i = 0; i < totalMessages; i++) {
Message<GenericRecord> msg1 = consumer1.receive();
Message<GenericRecord> msg2 = consumer2.receive();
Message<GenericRecord> msg3 = consumer3.receive();
assertTrue(msg1 != null && msg2 != null && msg3 != null);
assertTrue(msg1.getValue().equals(msg2.getValue()) && msg2.getValue().equals(msg3.getValue()));
GenericRecord record1 = msg1.getValue();
GenericRecord record2 = msg2.getValue();
GenericRecord record3 = msg3.getValue();
int id1 = (int) record1.getField("id");
int id2 = (int) record2.getField("id");
int id3 = (int) record3.getField("id");
log.info("Received ids, id1: {}, id2: {}, id3: {}, lastId: {}", id1, id2, id3, lastId);
assertTrue(id1 == id2 && id2 == id3);
assertTrue(id1 > lastId);
lastId = id1;
consumer1.acknowledge(msg1);
consumer2.acknowledge(msg2);
consumer3.acknowledge(msg3);
Expand Down

0 comments on commit 39c1ee1

Please sign in to comment.