Skip to content

Commit

Permalink
Using the consistent metadata store scheme name. (apache#13937)
Browse files Browse the repository at this point in the history
Using the consistent metadata store scheme name.

- RocksDB -> rocksdb:
- ZooKeeper -> zk:
- Memory -> memory:
- Ectd -> etcd:

Context: apache#13225 (comment)
  • Loading branch information
codelipenghui authored Jan 27, 2022
1 parent 7c8f575 commit 2762f2d
Show file tree
Hide file tree
Showing 18 changed files with 36 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public MockedBookKeeperTestCase(int numBookies) {
public final void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create("memory://local", MetadataStoreConfig.builder().build()));
MetadataStoreExtended.create("memory:local", MetadataStoreConfig.builder().build()));

try {
// start bookkeeper service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class BookieRackAffinityMappingTest {

@BeforeMethod
public void setUp() throws Exception {
store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
BOOKIE1 = new BookieSocketAddress("127.0.0.1:3181");
BOOKIE2 = new BookieSocketAddress("127.0.0.2:3181");
BOOKIE3 = new BookieSocketAddress("127.0.0.3:3181");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
@BeforeMethod
public void setUp() throws Exception {
timer = new HashedWheelTimer();
store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());

writableBookies.add(new BookieSocketAddress(BOOKIE1).toBookieId());
writableBookies.add(new BookieSocketAddress(BOOKIE2).toBookieId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class BundlesQuotasTest {

@BeforeMethod
public void setup() throws Exception {
store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());

PulsarService pulsar = mock(PulsarService.class);
when(pulsar.getLocalMetadataStore()).thenReturn(mock(MetadataStoreExtended.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class DistributedIdGeneratorTest {

@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
store = MetadataStoreExtended.create("memory://local", MetadataStoreConfig.builder().build());
store = MetadataStoreExtended.create("memory:local", MetadataStoreConfig.builder().build());
coordinationService = new CoordinationServiceImpl(store);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void setup() throws Exception {
.when(pulsar).getBookKeeperClient();
eventLoopGroup = new NioEventLoopGroup();

store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
doReturn(store).when(pulsar).getLocalMetadataStore();
doReturn(store).when(pulsar).getConfigurationMetadataStore();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class PulsarFunctionMetadataStoreTest extends PulsarFunctionLocalRunTest
protected WorkerConfig createWorkerConfig(ServiceConfiguration config) {
WorkerConfig wc = super.createWorkerConfig(config);
wc.setStateStorageProviderImplementation(PulsarMetadataStateStoreProviderImpl.class.getName());
wc.setStateStorageServiceUrl("memory://local");
wc.setStateStorageServiceUrl("memory:local");
return wc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class PulsarMetadataStateStoreImplTest {

@BeforeMethod
public void setup() throws Exception {
this.store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
this.store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
this.countersCache = store.getMetadataCache(Long.class);
this.stateContext = new PulsarMetadataStateStoreImpl(store, "/prefix", TENANT, NS, NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.pulsar.metadata.impl;

import com.google.common.collect.MapMaker;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
Expand Down Expand Up @@ -51,6 +49,8 @@
@Slf4j
public class LocalMemoryMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended {

static final String MEMORY_SCHEME_IDENTIFIER = "memory:";

@Data
private static class Value {
final long version;
Expand All @@ -73,20 +73,13 @@ private static class Value {

public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
URI uri;
try {
uri = new URI(metadataURL);
} catch (URISyntaxException e) {
throw new MetadataStoreException(e);
}

String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
if ("local".equals(uri.getHost())) {
if ("local".equals(name)) {
map = new TreeMap<>();
sequentialIdGenerator = new AtomicLong();
} else {
// Use a reference from a shared data set
String name = uri.getHost();
map = STATIC_MAPS.computeIfAbsent(name, __ -> new TreeMap<>());
STATIC_INSTANCE.compute(name, (key, value) -> {
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ private static MetadataStore newInstance(String metadataURL, MetadataStoreConfig
boolean enableSessionWatcher)
throws MetadataStoreException {

if (metadataURL.startsWith("memory://")) {
if (metadataURL.startsWith(LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER)) {
return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
} else if (metadataURL.startsWith("rocksdb://")) {
} else if (metadataURL.startsWith(RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER)) {
return RocksdbMetadataStore.get(metadataURL, metadataStoreConfig);
} else if (metadataURL.startsWith(EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER)) {
return new EtcdMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
} else if (metadataURL.startsWith(ZKMetadataStore.ZK_SCHEME_IDENTIFIER)) {
return new ZKMetadataStore(metadataURL.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length()),
metadataStoreConfig, enableSessionWatcher);
} else {
return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
*/
@Slf4j
public class RocksdbMetadataStore extends AbstractMetadataStore {

static final String ROCKSDB_SCHEME_IDENTIFIER = "rocksdb:";

private static final byte[] SEQUENTIAL_ID_KEY = toBytes("__metadata_sequentialId_key");
private static final byte[] INSTANCE_ID_KEY = toBytes("__metadata_instanceId_key");

Expand Down Expand Up @@ -200,7 +203,7 @@ static long toLong(byte[] bytes) {
private final String metadataUrl;

/**
* @param metadataURL format "rocksdb://{storePath}"
* @param metadataURL format "rocksdb:{storePath}"
* @param metadataStoreConfig
* @throws MetadataStoreException
*/
Expand All @@ -213,7 +216,7 @@ private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataSto
throw new MetadataStoreException("Failed to load RocksDB JNI library", t);
}

String dataDir = metadataURL.substring("rocksdb://".length());
String dataDir = metadataURL.substring("rocksdb:".length());
Path dataPath = FileSystems.getDefault().getPath(dataDir);
try {
Files.createDirectories(dataPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
public class ZKMetadataStore extends AbstractBatchedMetadataStore
implements MetadataStoreExtended, MetadataStoreLifecycle {

static final String ZK_SCHEME_IDENTIFIER = "zk:";

private final String metadataURL;
private final MetadataStoreConfig metadataStoreConfig;
private final boolean isZkManaged;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public Object[][] implementations() {
// Supplier<String> lambda is used for providing the value.
return new Object[][]{
{ "ZooKeeper", stringSupplier(() -> zks.getConnectionString()) },
{ "Memory", stringSupplier(() -> "memory://" + UUID.randomUUID()) },
{ "RocksDB", stringSupplier(() -> "rocksdb://" + createTempFolder()) },
{ "Memory", stringSupplier(() -> "memory:" + UUID.randomUUID()) },
{ "RocksDB", stringSupplier(() -> "rocksdb:" + createTempFolder()) },
{"Etcd", stringSupplier(() -> "etcd:" + etcdCluster.getClientEndpoints().stream().map(x -> x.toString())
.collect(Collectors.joining(",")))},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ public class LocalMemoryMetadataStoreTest {
@Test
public void testPrivateInstance() throws Exception {
@Cleanup
MetadataStore store1 = MetadataStoreFactory.create("memory://local",
MetadataStore store1 = MetadataStoreFactory.create("memory:local",
MetadataStoreConfig.builder().build());

@Cleanup
MetadataStore store2 = MetadataStoreFactory.create("memory://local",
MetadataStore store2 = MetadataStoreFactory.create("memory:local",
MetadataStoreConfig.builder().build());

store1.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
Expand All @@ -50,7 +50,7 @@ public void testPrivateInstance() throws Exception {

@Test
public void testSharedInstance() throws Exception {
String url = "memory://" + UUID.randomUUID();
String url = "memory:" + UUID.randomUUID();

@Cleanup
MetadataStore store1 = MetadataStoreFactory.create(url,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testOpenDbWithConfigFile() throws Exception {
String optionFilePath =
getClass().getClassLoader().getResource("rocksdb_option_file_example.ini").getPath();
log.info("optionFilePath={}", optionFilePath);
store = MetadataStoreFactory.create("rocksdb://" + tempDir.toAbsolutePath(),
store = MetadataStoreFactory.create("rocksdb:" + tempDir.toAbsolutePath(),
MetadataStoreConfig.builder().configFilePath(optionFilePath).build());
Assert.assertTrue(store instanceof RocksdbMetadataStore);

Expand All @@ -97,7 +97,7 @@ public void testOpenDbWithConfigFile() throws Exception {

//reopen db
store.close();
store = MetadataStoreFactory.create("rocksdb://" + tempDir.toAbsolutePath(),
store = MetadataStoreFactory.create("rocksdb:" + tempDir.toAbsolutePath(),
MetadataStoreConfig.builder().configFilePath(optionFilePath).build());

//test get
Expand All @@ -118,10 +118,10 @@ public void testMultipleInstances() throws Exception {

Path tempDir = Files.createTempDirectory("RocksdbMetadataStoreTest");
log.info("Temp dir:{}", tempDir.toAbsolutePath());
MetadataStore store1 = MetadataStoreFactory.create("rocksdb://" + tempDir.toAbsolutePath(),
MetadataStore store1 = MetadataStoreFactory.create("rocksdb:" + tempDir.toAbsolutePath(),
MetadataStoreConfig.builder().build());

MetadataStore store2 = MetadataStoreFactory.create("rocksdb://" + tempDir.toAbsolutePath(),
MetadataStore store2 = MetadataStoreFactory.create("rocksdb:" + tempDir.toAbsolutePath(),
MetadataStoreConfig.builder().build());

// We should get the same instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public MockedBookKeeperTestCase(int numBookies) {
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory://local",
metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory:local",
MetadataStoreConfig.builder().build()));
try {
// start bookkeeper service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public MockedBookKeeperTestCase(int numBookies) {
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory://local",
metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory:local",
MetadataStoreConfig.builder().build()));
try {
// start bookkeeper service
Expand Down
2 changes: 1 addition & 1 deletion pulsar-websocket/src/test/resources/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
### --- Web Socket proxy settings --- ###

# Configuration Store connection string
configurationStoreServers=memory://127.0.0.1:2181
configurationStoreServers=memory:127.0.0.1:2181

# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
Expand Down

0 comments on commit 2762f2d

Please sign in to comment.