Skip to content

Commit

Permalink
Replicated subscriptions - Configuration and client changes (apache#4299
Browse files Browse the repository at this point in the history
)

* Replicated subscriptions - Configuration and client changes

* Added missing header

* Fixed mocked methods for tests

* Fixed typo
  • Loading branch information
merlimat authored May 20, 2019
1 parent 66fc48e commit 6e51237
Show file tree
Hide file tree
Showing 26 changed files with 403 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ protected void internalResetCursor(String subName, long timestamp, boolean autho
}
}

protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative) {
protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative, boolean replicated) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
Expand Down Expand Up @@ -968,7 +968,7 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl
}

PersistentSubscription subscription = (PersistentSubscription) topic
.createSubscription(subscriptionName, InitialPosition.Latest).get();
.createSubscription(subscriptionName, InitialPosition.Latest, replicated).get();
// Mark the cursor as "inactive" as it was created without a real consumer connected
subscription.deactivateCursor();
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("pro
validateTopicName(property, cluster, namespace, encodedTopic);
return internalGetPartitionedStatsInternal(authoritative);
}

@DELETE
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
Expand Down Expand Up @@ -397,9 +397,9 @@ public void resetCursorOnPosition(@PathParam("property") String property, @PathP
public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic,
@PathParam("subscriptionName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) {
validateTopicName(property, cluster, namespace, topic);
internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void createPartitionedTopic(@PathParam("tenant") String tenant, @PathPara
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createNonPartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateGlobalNamespaceOwnership(tenant,namespace);
validateTopicName(tenant, namespace, encodedTopic);
Expand Down Expand Up @@ -316,7 +316,7 @@ public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("ten
validateTopicName(tenant, namespace, encodedTopic);
return internalGetPartitionedStatsInternal(authoritative);
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
Expand Down Expand Up @@ -390,9 +390,9 @@ public void expireMessagesForAllSubscriptions(@PathParam("tenant") String tenant
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
public void createSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) {
validateTopicName(tenant, namespace, topic);
internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
final InitialPosition initialPosition = subscribe.getInitialPosition();
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();

CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
Expand Down Expand Up @@ -685,7 +686,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition);
readCompacted, initialPosition, isReplicated);
} else {
return FutureUtil.failedFuture(
new IncompatibleSchemaException(
Expand All @@ -696,7 +697,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition);
startMessageId, metadata, readCompacted, initialPosition,
isReplicated);
}
})
.thenAccept(consumer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public interface Subscription {

String getTopicName();

boolean isReplicated();

Dispatcher getDispatcher();

long getNumberOfEntriesInBacklog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,11 @@ default long getOriginalSequenceId() {

CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition);
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
boolean replicateSubscriptionState);

CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition);
CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState);

CompletableFuture<Void> unsubscribe(String subName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public Topic getTopic() {
return topic;
}

@Override
public boolean isReplicated() {
return false;
}

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (IS_FENCED_UPDATER.get(this) == TRUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void addProducer(Producer producer) throws BrokerServiceException {
lock.readLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());

if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic", topic);
throw new TopicFencedException("Topic is temporarily unavailable");
Expand Down Expand Up @@ -315,10 +315,11 @@ public void removeProducer(Producer producer) {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
boolean replicateSubscriptionState) {

final CompletableFuture<Consumer> future = new CompletableFuture<>();

try {
brokerService.checkTopicNsOwnership(getName());
} catch (Exception e) {
Expand Down Expand Up @@ -396,7 +397,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
}

@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class CompactorSubscription extends PersistentSubscription {

public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
String subscriptionName, ManagedCursor cursor) {
super(topic, subscriptionName, cursor);
super(topic, subscriptionName, cursor, false);
checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
this.compactedTopic = compactedTopic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@

import com.google.common.base.MoreObjects;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
Expand All @@ -40,7 +41,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
Expand Down Expand Up @@ -76,12 +76,36 @@ public class PersistentSubscription implements Subscription {
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor) {
private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";

// Map of properties that is used to mark this subscription as "replicated".
// Since this is the only field at this point, we can just keep a static
// instance of the map.
private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<>();
private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();

private volatile boolean isReplicated;

static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}

static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
this.isReplicated = replicated;
IS_FENCED_UPDATER.set(this, FALSE);
}

Expand All @@ -95,6 +119,15 @@ public Topic getTopic() {
return topic;
}

@Override
public boolean isReplicated() {
return isReplicated;
}

void setReplicated(boolean replicated) {
this.isReplicated = replicated;
}

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
cursor.updateLastActive();
Expand Down Expand Up @@ -194,7 +227,7 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
}
cursor.asyncMarkDelete(position, properties, markDeleteCallback, position);
cursor.asyncMarkDelete(position, mergeCursorProperties(properties), markDeleteCallback, position);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
Expand Down Expand Up @@ -647,7 +680,7 @@ public SubscriptionStats getStats() {
}
subStats.msgBacklog = getNumberOfEntriesInBacklog();
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();

subStats.isReplicated = isReplicated;
return subStats;
}

Expand Down Expand Up @@ -681,5 +714,25 @@ void topicTerminated() {
}
}

/**
* Return a merged map that contains the cursor properties specified by used
* (eg. when using compaction subscription) and the subscription properties.
*/
protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
Map<String, Long> baseProperties = isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
: NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;

if (userProperties.isEmpty()) {
// Use only the static instance in the common case
return baseProperties;
} else {
Map<String, Long> merged = new TreeMap<>();
merged.putAll(userProperties);
merged.putAll(baseProperties);
return merged;
}

}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Loading

0 comments on commit 6e51237

Please sign in to comment.