Skip to content

Commit

Permalink
[improve] [pip] PIP-356: Support Geo-Replication starts at earliest p…
Browse files Browse the repository at this point in the history
…osition (apache#22791)
  • Loading branch information
poorbarcode authored May 30, 2024
1 parent 5a7efd8 commit 43b20c3
Showing 1 changed file with 71 additions and 0 deletions.
71 changes: 71 additions & 0 deletions pip/pip-356.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# PIP-356: Support Geo-Replication starts at earliest position

# Background knowledge

Replication reads messages from the source cluster, and copies them to the remote cluster.
- Registers a cursor named `pulsar.repl.{remote-cluster}` on the source cluster. Replicator reads messages relies on this cursor.
- Registers a producer on the remote cluster. Replicator writes messages relies on this producer.

# Motivation

If you have some older messages to migrate, the steps recommended are below, which was described at [pulsar doc](https://pulsar.apache.org/docs/3.2.x/administration-geo/#migrate-data-between-clusters-using-geo-replication).
1. Create the cursor that the replicator will use manually: `pulsar-admin topics create-subscription -s pulsar.repl.{remote-cluster} -m earliest <topic>`.
2. Enable namespace-level/topic-level Geo-Replication.

The steps recommended are difficultly to use, for example:
- Create cursor `pulsar.repl.{remote-cluster}` manually.
- The namespace/topic was unloaded due to a re-balance.
- The broker will remove the `pulsar.repl.{remote-cluster}` automatically because the Geo-Replication feature is disabled at this moment.
- Enable namespace-level/topic-level Geo-Replication, but the cursor that was created manually has been deleted, the broker will create a new one with latest position, which is not expected.


# Goals
Add an optional config(broker level, namespace level, and topic level) to support Geo-Replication starting at the earliest position.

### Configuration

**broker.conf**
```properties
# The position that replication task start at, it can be set to "earliest" or "latest (default)".
replicationStartAt=latest
```

**ServiceConfiguration**
```java
@FieldContext(
category = CATEGORY_REPLICATION,
dynamic = true,
doc = "The position that replication task start at, it can be set to earliest or latest (default)."
)
String replicationStartAt = "latest";
```

### Public API

**V2/Namespaces.java**
```java
@POST
@Path("/{tenant}/{namespace}/replicationStartAt")
public void setNamespaceLevelReplicationStartAt(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("replicationStartAt") String replicationStartAt) {
...
...
}
```

**V2/PersistentTopics.java**
```java
@POST
@Path("/{tenant}/{namespace}/{topic}/replicationStartAt")
public void setNamespaceLevelReplicationStartAt(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("replicationStartAt") String replicationStartAt) {
...
...
}
```

0 comments on commit 43b20c3

Please sign in to comment.