Skip to content

Commit

Permalink
Setup initial namespaces with MetadataStore (apache#10612)
Browse files Browse the repository at this point in the history
  • Loading branch information
fantapsody authored May 21, 2021
1 parent 047fb6a commit a672fc1
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.beust.jcommander.Parameter;
import java.util.List;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.metadata.api.MetadataStore;

/**
* Setup the initial namespace of the cluster without startup the Pulsar broker.
Expand Down Expand Up @@ -51,48 +51,53 @@ private static class Arguments {

}

public static void main(String[] args) throws Exception {
public static int doMain(String[] args) throws Exception {
Arguments arguments = new Arguments();
JCommander jcommander = new JCommander();
try {
jcommander.addObject(arguments);
jcommander.parse(args);
if (arguments.help) {
jcommander.usage();
return;
return 0;
}
} catch (Exception e) {
jcommander.usage();
throw e;
return 1;
}

if (arguments.configurationStore == null) {
System.err.println("Configuration store address argument is required (--configuration-store)");
jcommander.usage();
System.exit(1);
return 1;
}

ZooKeeper configStoreZk = PulsarClusterMetadataSetup
.initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);

for (String namespace : arguments.namespaces) {
NamespaceName namespaceName = null;
try {
namespaceName = NamespaceName.get(namespace);
} catch (Exception e) {
System.out.println("Invalid namespace name.");
System.exit(1);
try (MetadataStore configStore = PulsarClusterMetadataSetup
.initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
for (String namespace : arguments.namespaces) {
NamespaceName namespaceName = null;
try {
namespaceName = NamespaceName.get(namespace);
} catch (Exception e) {
System.out.println("Invalid namespace name.");
return 1;
}

// Create specified tenant
PulsarClusterMetadataSetup
.createTenantIfAbsent(configStore, namespaceName.getTenant(), arguments.cluster);

// Create specified namespace
PulsarClusterMetadataSetup.createNamespaceIfAbsent(configStore, namespaceName,
arguments.cluster);
}

// Create system tenant
PulsarClusterMetadataSetup
.createTenantIfAbsent(configStoreZk, namespaceName.getTenant(), arguments.cluster);

// Create system namespace
PulsarClusterMetadataSetup.createNamespaceIfAbsent(configStoreZk, namespaceName,
arguments.cluster);
}

System.out.println("Initial namespace setup success");
return 0;
}

public static void main(String[] args) throws Exception {
System.exit(doMain(args));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.PulsarInitialNamespaceSetup;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
Expand Down Expand Up @@ -148,6 +156,35 @@ public void testSetupWithBkMetadataServiceUri() throws Exception {
assertNotNull(localZk.exists("/ledgers", false));
}

@Test
public void testInitialNamespaceSetup() throws Exception {
// missing arguments
assertEquals(PulsarInitialNamespaceSetup.doMain(new String[]{}), 1);
// invalid namespace
assertEquals(PulsarInitialNamespaceSetup.doMain(new String[]{
"--cluster", "testInitialNamespaceSetup-cluster",
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"a/b/c/d"
}), 1);

String[] args = {
"--cluster", "testInitialNamespaceSetup-cluster",
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"test/a",
"test/b",
"test/c",
};
assertEquals(PulsarInitialNamespaceSetup.doMain(args), 0);
try (MetadataStoreExtended store = MetadataStoreExtended.create("127.0.0.1:" + localZkS.getZookeeperPort(),
MetadataStoreConfig.builder().build())) {
TenantResources tenantResources = new TenantResources(store,
PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC);
List<String> namespaces = tenantResources.getChildren(PulsarWebResource
.path(ConfigurationCacheService.POLICIES, "test"));
assertEquals(new HashSet<>(namespaces), new HashSet<>(Arrays.asList("a", "b", "c")));
}
}

@BeforeMethod
void setup() throws Exception {
localZkS = new ZookeeperServerTest(0);
Expand Down

0 comments on commit a672fc1

Please sign in to comment.