Skip to content

Commit

Permalink
[pulsar-broker] topic resources use metadata-store api (apache#9485)
Browse files Browse the repository at this point in the history
* [pulsar-broker] topics resources use metadata-store api

[pulsar-broker] MockZK: Handle zk-children watch notification

fix test

fix sync function initialization

fix tests

fix zk-create

add timeout

* address comments
  • Loading branch information
rdhabalia authored Feb 18, 2021
1 parent c754f9e commit 0f9e211
Show file tree
Hide file tree
Showing 22 changed files with 221 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ public void start() throws PulsarServerException {
coordinationService = new CoordinationServiceImpl(localMetadataStore);

configurationMetadataStore = createConfigurationMetadataStore();
pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore);
pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore,
config.getZooKeeperOperationTimeoutSeconds());

orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
Expand All @@ -81,7 +84,6 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,14 +135,6 @@ protected void zkCreateOptimisticAsync(ZooKeeper zk, String path,
CreateMode.PERSISTENT, callback, null);
}

protected boolean zkPathExists(String path) throws KeeperException, InterruptedException {
Stat stat = globalZk().exists(path, false);
if (null != stat) {
return true;
}
return false;
}

protected void zkSync(String path) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger rc = new AtomicInteger(KeeperException.Code.OK.intValue());
Expand Down Expand Up @@ -247,29 +241,31 @@ protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {

private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
zkCreateOptimisticAsync(localZk(),
ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
result.complete(null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
namespaceResources().getLocalStore()
.put(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0], Optional.of(-1L))
.thenAccept(r -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(), topicName.getPartition(partition));
}
result.complete(null);
}).exceptionally(ex -> {
if (ex.getCause() instanceof AlreadyExistsException) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
result.complete(null);
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
tryCreatePartitionAsync(partition, result);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
});
} else if (ex.getCause() instanceof BadVersionException) {
log.warn("[{}] Partitioned topic {} is already created.", clientAppId(),
topicName.getPartition(partition));
// metadata-store api returns BadVersionException if node already exists while creating the
// resource
result.complete(null);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), ex.getCause());
result.completeExceptionally(ex.getCause());
}
return null;
});
return result;
}

Expand Down Expand Up @@ -729,11 +725,11 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
try {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE,
namespaceName.toString(), topicDomain.value());
List<String> topics = globalZk().getChildren(partitionedTopicPath, false);
List<String> topics = namespaceResources().getChildren(partitionedTopicPath);
partitionedTopics = topics.stream()
.map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
} catch (KeeperException.NoNodeException e) {
} catch (NotFoundException e) {
// NoNode means there are no partitioned topics in this domain for this namespace
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
Expand Down Expand Up @@ -828,49 +824,37 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n

try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
globalZk().sync(path, (rc2, s2, ctx) -> {
if (KeeperException.Code.OK.intValue() == rc2) {
log.info("[{}] Successfully created partitioned topic {}",
namespaceResources().getPartitionedTopicResources()
.createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}", clientAppId(),
topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(),
topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
}).exceptionally(ex -> {
if (ex.getCause() instanceof AlreadyExistsException) {
log.warn("[{}] Failed to create already existing partitioned topic {}",
clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}",
clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}",
clientAppId(), topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
} else {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName,
KeeperException.create(KeeperException.Code.get(rc2)));
asyncResponse.resume(
new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
new RestException(Status.CONFLICT, "Partitioned topic already exists"));
} else if (ex.getCause() instanceof BadVersionException) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName,
ex.getCause());
asyncResponse.resume(new RestException(ex.getCause()));
}
}, null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.warn("[{}] Failed to create already existing partitioned topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT,
"Partitioned topic already exists"));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
clientAppId(),
topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
asyncResponse.resume(
new RestException(KeeperException.create(KeeperException.Code.get(rc))));
}
});
return null;
});
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.Getter;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand All @@ -41,20 +42,23 @@ public class BaseResources<T> {
private final MetadataStoreExtended store;
@Getter
private final MetadataCache<T> cache;
private int operationTimeoutSec;

public BaseResources(MetadataStoreExtended store, Class<T> clazz) {
public BaseResources(MetadataStoreExtended store, Class<T> clazz, int operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(clazz);
this.operationTimeoutSec = operationTimeoutSec;
}

public BaseResources(MetadataStoreExtended store, TypeReference<T> typeRef) {
public BaseResources(MetadataStoreExtended store, TypeReference<T> typeRef, int operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(typeRef);
this.operationTimeoutSec = operationTimeoutSec;
}

public List<String> getChildren(String path) throws MetadataStoreException {
try {
return getChildrenAsync(path).get();
return getChildrenAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -69,7 +73,7 @@ public CompletableFuture<List<String>> getChildrenAsync(String path) {

public Optional<T> get(String path) throws MetadataStoreException {
try {
return getAsync(path).get();
return getAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -84,7 +88,7 @@ public CompletableFuture<Optional<T>> getAsync(String path) {

public void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
try {
setAsync(path, modifyFunction).get();
setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -99,7 +103,7 @@ public CompletableFuture<Void> setAsync(String path, Function<T, T> modifyFuncti

public void setWithCreate(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
try {
setWithCreateAsync(path, createFunction).get();
setWithCreateAsync(path, createFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -114,7 +118,7 @@ public CompletableFuture<Void> setWithCreateAsync(String path, Function<Optional

public void create(String path, T data) throws MetadataStoreException {
try {
createAsync(path, data).get();
createAsync(path, data).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -129,7 +133,7 @@ public CompletableFuture<Void> createAsync(String path, T data) {

public void delete(String path) throws MetadataStoreException {
try {
deleteAsync(path).get();
deleteAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -144,7 +148,7 @@ public CompletableFuture<Void> deleteAsync(String path) {

public boolean exists(String path) throws MetadataStoreException {
try {
return existsAsync(path).get();
return existsAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class ClusterResources extends BaseResources<ClusterData> {
@Getter
private FailureDomainResources failureDomainResources;

public ClusterResources(MetadataStoreExtended store) {
super(store, ClusterData.class);
this.failureDomainResources = new FailureDomainResources(store, FailureDomain.class);
public ClusterResources(MetadataStoreExtended store, int operationTimeoutSec) {
super(store, ClusterData.class, operationTimeoutSec);
this.failureDomainResources = new FailureDomainResources(store, FailureDomain.class, operationTimeoutSec);
}

public Set<String> list() throws MetadataStoreException {
Expand All @@ -44,8 +44,9 @@ public Set<String> list() throws MetadataStoreException {
public static class FailureDomainResources extends BaseResources<FailureDomain> {
public static final String FAILURE_DOMAIN = "failureDomain";

public FailureDomainResources(MetadataStoreExtended store, Class<FailureDomain> clazz) {
super(store, clazz);
public FailureDomainResources(MetadataStoreExtended store, Class<FailureDomain> clazz,
int operationTimeoutSec) {
super(store, clazz, operationTimeoutSec);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

public class DynamicConfigurationResources extends BaseResources<Map<String, String>> {

public DynamicConfigurationResources(MetadataStoreExtended store) {
super(store, new TypeReference<Map<String, String>>(){});
public DynamicConfigurationResources(MetadataStoreExtended store, int operationTimeoutSec) {
super(store, new TypeReference<Map<String, String>>() {
}, operationTimeoutSec);
}

}
Loading

0 comments on commit 0f9e211

Please sign in to comment.