diff --git a/pip/pip-356.md b/pip/pip-356.md new file mode 100644 index 0000000000000..c922e212ad748 --- /dev/null +++ b/pip/pip-356.md @@ -0,0 +1,71 @@ +# PIP-356: Support Geo-Replication starts at earliest position + +# Background knowledge + +Replication reads messages from the source cluster, and copies them to the remote cluster. +- Registers a cursor named `pulsar.repl.{remote-cluster}` on the source cluster. Replicator reads messages relies on this cursor. +- Registers a producer on the remote cluster. Replicator writes messages relies on this producer. + +# Motivation + +If you have some older messages to migrate, the steps recommended are below, which was described at [pulsar doc](https://pulsar.apache.org/docs/3.2.x/administration-geo/#migrate-data-between-clusters-using-geo-replication). +1. Create the cursor that the replicator will use manually: `pulsar-admin topics create-subscription -s pulsar.repl.{remote-cluster} -m earliest `. +2. Enable namespace-level/topic-level Geo-Replication. + +The steps recommended are difficultly to use, for example: +- Create cursor `pulsar.repl.{remote-cluster}` manually. +- The namespace/topic was unloaded due to a re-balance. + - The broker will remove the `pulsar.repl.{remote-cluster}` automatically because the Geo-Replication feature is disabled at this moment. +- Enable namespace-level/topic-level Geo-Replication, but the cursor that was created manually has been deleted, the broker will create a new one with latest position, which is not expected. + + +# Goals +Add an optional config(broker level, namespace level, and topic level) to support Geo-Replication starting at the earliest position. + +### Configuration + +**broker.conf** +```properties +# The position that replication task start at, it can be set to "earliest" or "latest (default)". +replicationStartAt=latest +``` + +**ServiceConfiguration** +```java +@FieldContext( + category = CATEGORY_REPLICATION, + dynamic = true, + doc = "The position that replication task start at, it can be set to earliest or latest (default)." +) +String replicationStartAt = "latest"; +``` + +### Public API + +**V2/Namespaces.java** +```java +@POST +@Path("/{tenant}/{namespace}/replicationStartAt") +public void setNamespaceLevelReplicationStartAt( + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @QueryParam("replicationStartAt") String replicationStartAt) { + ... + ... +} +``` + +**V2/PersistentTopics.java** +```java +@POST +@Path("/{tenant}/{namespace}/{topic}/replicationStartAt") +public void setNamespaceLevelReplicationStartAt( + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @QueryParam("replicationStartAt") String replicationStartAt) { + ... + ... +} +```