Skip to content

Commit

Permalink
[feat] [broker] PIP-188 support blue-green cluster migration [part-1] (
Browse files Browse the repository at this point in the history
…apache#17962)

* [feat][PIP-188] support blue-green cluster migration [part-1]

Add blue-green cluster migration

Fix dependency

* cleanup
  • Loading branch information
rdhabalia authored Oct 20, 2022
1 parent 5b452d1 commit b0945d1
Show file tree
Hide file tree
Showing 41 changed files with 886 additions and 19 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,10 @@ splitTopicAndPartitionLabelInPrometheus=false
# Otherwise, aggregate it by list index.
aggregatePublisherStatsByProducerName=false

# Interval between checks to see if cluster is migrated and marks topic migrated
# if cluster is marked migrated. Disable with value 0. (Default disabled).
clusterMigrationCheckDurationSeconds=0

### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L

void asyncTerminate(TerminateCallback callback, Object ctx);

CompletableFuture<Position> asyncMigrate();

/**
* Terminate the managed ledger and return the last committed entry.
*
Expand Down Expand Up @@ -534,6 +536,11 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L
*/
boolean isTerminated();

/**
* Returns whether the managed ledger was migrated.
*/
boolean isMigrated();

/**
* Returns managed-ledger config.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -241,6 +242,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
private static final String MIGRATION_STATE_PROPERTY = "migrated";

public enum State {
None, // Uninitialized
Expand Down Expand Up @@ -268,6 +270,7 @@ public enum PositionBound {
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;
private volatile boolean migrated = false;

@Getter
private final OrderedScheduler scheduledExecutor;
Expand Down Expand Up @@ -343,7 +346,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = getMaximumRolloverTimeMs(config);
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = new HashMap();
this.propertiesMap = new ConcurrentHashMap<>();
this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs();
if (config.getManagedLedgerInterceptor() != null) {
this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
Expand All @@ -367,7 +370,6 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition());
log.info("[{}] Recovering managed ledger terminated at {}", name, lastConfirmedEntry);
}

for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls);
}
Expand All @@ -379,6 +381,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
propertiesMap.put(property.getKey(), property.getValue());
}
}
migrated = mlInfo.hasTerminatedPosition() && propertiesMap.containsKey(MIGRATION_STATE_PROPERTY);
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap);
}
Expand Down Expand Up @@ -1271,6 +1274,27 @@ private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long consum
}
}

public CompletableFuture<Position> asyncMigrate() {
propertiesMap.put(MIGRATION_STATE_PROPERTY, Boolean.TRUE.toString());
CompletableFuture<Position> result = new CompletableFuture<>();
asyncTerminate(new TerminateCallback() {

@Override
public void terminateComplete(Position lastCommittedPosition, Object ctx) {
migrated = true;
log.info("[{}] topic successfully terminated and migrated at {}", name, lastCommittedPosition);
result.complete(lastCommittedPosition);
}

@Override
public void terminateFailed(ManagedLedgerException exception, Object ctx) {
log.info("[{}] topic failed to terminate and migrate ", name, exception);
result.completeExceptionally(exception);
}
}, null);
return result;
}

@Override
public synchronized void asyncTerminate(TerminateCallback callback, Object ctx) {
if (state == State.Fenced) {
Expand Down Expand Up @@ -1363,6 +1387,11 @@ public boolean isTerminated() {
return state == State.Terminated;
}

@Override
public boolean isMigrated() {
return migrated;
}

@Override
public void close() throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2514,6 +2514,13 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if cluster is migrated and marks topic migrated "
+ " if cluster is marked migrated. Disable with value 0. (Default disabled)."
)
private int clusterMigrationCheckDurationSeconds = 0;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "Enforce schema validation on following cases:\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
Expand All @@ -59,6 +60,7 @@
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
Expand Down Expand Up @@ -229,6 +231,66 @@ public void updateCluster(
});
}

@POST
@Path("/{cluster}/migrate")
@ApiOperation(
value = "Update the configuration for a cluster migration.",
notes = "This operation requires Pulsar superuser privileges.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cluster has been updated."),
@ApiResponse(code = 400, message = "Cluster url must not be empty."),
@ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void updateClusterMigration(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
@ApiParam(value = "Is cluster migrated", required = true)
@QueryParam("migrated") boolean isMigrated,
@ApiParam(
value = "The cluster url data",
required = true,
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
value = """
{
"serviceUrl": "http://pulsar.example.com:8080",
"brokerServiceUrl": "pulsar://pulsar.example.com:6651"
}
"""
)
)
) ClusterUrl clusterUrl) {
if (isMigrated && clusterUrl.isEmpty()) {
asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Cluster url must not be empty"));
return;
}
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> {
ClusterDataImpl data = (ClusterDataImpl) old;
data.setMigrated(isMigrated);
data.setMigratedClusterUrl(clusterUrl);
return data;
}))
.thenAccept(__ -> {
log.info("[{}] Updated cluster {}", clientAppId(), cluster);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, ex);
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return null;
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{cluster}/peers")
@ApiOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,19 @@ protected String getSubscriptionName() {
return subscription == null ? null : subscription.getName();
}

protected void checkAndApplyReachedEndOfTopicOrTopicMigration(List<Consumer> consumers) {
PersistentTopic topic = (PersistentTopic) subscription.getTopic();
checkAndApplyReachedEndOfTopicOrTopicMigration(topic, consumers);
}

public static void checkAndApplyReachedEndOfTopicOrTopicMigration(PersistentTopic topic, List<Consumer> consumers) {
if (topic.isMigrated()) {
consumers.forEach(c -> c.topicMigrated(topic.getMigratedClusterUrl()));
} else {
consumers.forEach(Consumer::reachedEndOfTopic);
}
}

@Override
public long getFilterProcessedMsgCount() {
return this.filterProcessedMsgs.longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
Expand All @@ -59,6 +61,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
Expand Down Expand Up @@ -686,7 +689,10 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
lock.writeLock().lock();
try {
checkTopicFenced();
if (isTerminated()) {
if (isMigrated()) {
log.warn("[{}] Attempting to add producer to a migrated topic", topic);
throw new TopicMigratedException("Topic was already migrated");
} else if (isTerminated()) {
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
Expand Down Expand Up @@ -1180,6 +1186,8 @@ public boolean deletePartitionedTopicMetadataWhileInactive() {

protected abstract boolean isTerminated();

protected abstract boolean isMigrated();

private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);

public InactiveTopicPolicies getInactiveTopicPolicies() {
Expand Down Expand Up @@ -1299,4 +1307,25 @@ public void updateBrokerSubscribeRate() {
topicPolicies.getSubscribeRate().updateBrokerValue(
subscribeRateInBroker(brokerService.pulsar().getConfiguration()));
}

public Optional<ClusterUrl> getMigratedClusterUrl() {
return getMigratedClusterUrl(brokerService.getPulsar());
}

public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync(PulsarService pulsar) {
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
.thenApply(clusterData -> (clusterData.isPresent() && clusterData.get().isMigrated())
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty());
}

public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar) {
try {
return getMigratedClusterUrlAsync(pulsar)
.get(pulsar.getPulsarResources().getClusterResources().getOperationTimeoutSec(), TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Failed to get migration cluster URL", e);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,13 @@ protected void startInactivityMonitor() {
subscriptionExpiryCheckIntervalInSeconds,
subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS);
}

// check cluster migration
int interval = pulsar().getConfiguration().getClusterMigrationCheckDurationSeconds();
if (interval > 0) {
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkClusterMigration()), interval, interval,
TimeUnit.SECONDS);
}
}

protected void startMessageExpiryMonitor() {
Expand Down Expand Up @@ -1851,6 +1858,10 @@ public void checkGC() {
forEachTopic(Topic::checkGC);
}

public void checkClusterMigration() {
forEachTopic(Topic::checkClusterMigration);
}

public void checkMessageExpiry() {
forEachTopic(Topic::checkMessageExpiry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ public TopicTerminatedException(Throwable t) {
}
}

public static class TopicMigratedException extends BrokerServiceException {
public TopicMigratedException(String msg) {
super(msg);
}

public TopicMigratedException(Throwable t) {
super(t);
}
}

public static class ServerMetadataException extends BrokerServiceException {
public ServerMetadataException(Throwable t) {
super(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -49,10 +50,12 @@
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.KeyLongValue;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
Expand Down Expand Up @@ -785,6 +788,16 @@ public void reachedEndOfTopic() {
cnx.getCommandSender().sendReachedEndOfTopic(consumerId);
}

public void topicMigrated(Optional<ClusterUrl> clusterUrl) {
if (clusterUrl.isPresent()) {
ClusterUrl url = clusterUrl.get();
cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(),
url.getBrokerServiceUrlTls());
// disconnect consumer after sending migrated cluster url
disconnect();
}
}

/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
Expand Down
Loading

0 comments on commit b0945d1

Please sign in to comment.