Skip to content

Commit

Permalink
Create managed ledger path on local zookeeper when create partitions (a…
Browse files Browse the repository at this point in the history
…pache#6189)

### Motivation

Create managed ledger path on local zookeeper when creating partitions for a partitioned topic.

### Modifications

Change globalZk() to localZk() when creating partitions.

### Verifying this change

PartitionCreationTest can cover this change, since we use the same zookeeper for the unit test in ProducerConsumerBase, so the test passed before.
  • Loading branch information
codelipenghui authored Feb 8, 2020
1 parent 28875d5 commit 43d89f2
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ protected void zkCreateOptimistic(String path, byte[] content) throws Exception
ZkUtils.createFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

protected void zkCreateOptimisticAsync(String path, byte[] content, AsyncCallback.StringCallback callback) {
ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE,
protected void zkCreateOptimisticAsync(ZooKeeper zk, String path, byte[] content, AsyncCallback.StringCallback callback) {
ZkUtils.asyncCreateFullPathOptimistic(zk, path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT, callback, null);
}

Expand Down Expand Up @@ -265,7 +265,7 @@ protected void tryCreatePartitionsAsync(int numPartitions) {
}

private void tryCreatePartitionAsync(final int partition) {
zkCreateOptimisticAsync(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
Expand Down

0 comments on commit 43d89f2

Please sign in to comment.