Skip to content

Commit

Permalink
Use reference counting in RocksDB metadata store (apache#13309)
Browse files Browse the repository at this point in the history
### Motivation

RocksDB based implementation of MetadataStore right now can only be opened once, since it requires a single writer and it enforces it through a file lock. 

To make it easier to configure different components which may be creating their own instances of MetadataStore, we should make `RocksDbMetadataStore` to work consistently as the other implementations, by using a static map of instances and reference counting to decide when it can really get closed.
  • Loading branch information
merlimat authored Dec 15, 2021
1 parent d220c21 commit 8075dbd
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;


Expand All @@ -53,22 +51,6 @@ public class EmbeddedPulsarCluster implements AutoCloseable {

private final PulsarAdmin admin;

private class EmbeddedPulsarService extends PulsarService {
EmbeddedPulsarService(ServiceConfiguration conf) {
super(conf);
}

@Override
public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return bkCluster.getStore();
}

@Override
protected void closeLocalMetadataStore() {
// Do nothing as the store instance is managed by BKCluster
}
}

@Builder
private EmbeddedPulsarCluster(int numBrokers, int numBookies, String metadataStoreUrl) throws Exception {
this.numBrokers = numBrokers;
Expand All @@ -77,7 +59,7 @@ private EmbeddedPulsarCluster(int numBrokers, int numBookies, String metadataSto
this.bkCluster = new BKCluster(metadataStoreUrl, numBookies);

for (int i = 0; i < numBrokers; i++) {
PulsarService s = new EmbeddedPulsarService(getConf());
PulsarService s = new PulsarService(getConf());
s.start();
brokers.add(s);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private static MetadataStore newInstance(String metadataURL, MetadataStoreConfig
return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
}
if (metadataURL.startsWith("rocksdb://")) {
return new RocksdbMetadataStore(metadataURL, metadataStoreConfig);
return RocksdbMetadataStore.get(metadataURL, metadataStoreConfig);
} else {
return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -89,6 +91,29 @@ enum State {
RUNNING, CLOSED
}

private int referenceCount = 1;

private static final Map<String, RocksdbMetadataStore> instancesCache = new ConcurrentHashMap<>();

public static RocksdbMetadataStore get(String metadataStoreUri, MetadataStoreConfig conf)
throws MetadataStoreException {
RocksdbMetadataStore store = instancesCache.get(metadataStoreUri);
if (store != null) {
synchronized (store) {
if (store.referenceCount > 0) {
// Reuse the same store instance
store.referenceCount++;
return store;
}
}
}

// Create a new store instance
store = new RocksdbMetadataStore(metadataStoreUri, conf);
instancesCache.put(metadataStoreUri, store);
return store;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
Expand Down Expand Up @@ -172,13 +197,16 @@ static long toLong(byte[] bytes) {
return ByteBuffer.wrap(bytes).getLong();
}

private final String metadataUrl;

/**
* @param metadataURL format "rocksdb://{storePath}"
* @param metadataStoreConfig
* @throws MetadataStoreException
*/
public RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
} catch (Throwable t) {
Expand Down Expand Up @@ -309,7 +337,15 @@ private void configLog(Options options) throws IOException {
}

@Override
public void close() throws MetadataStoreException {
public synchronized void close() throws MetadataStoreException {
referenceCount--;
if (referenceCount > 0) {
// We close it only when the last reference is closed;
return;
}

instancesCache.remove(this.metadataUrl, this);

if (state == State.CLOSED) {
//already closed.
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,28 @@ public void testOpenDbWithConfigFile() throws Exception {
store.close();
FileUtils.deleteQuietly(tempDir.toFile());
}

@Test
public void testMultipleInstances() throws Exception {

Path tempDir = Files.createTempDirectory("RocksdbMetadataStoreTest");
log.info("Temp dir:{}", tempDir.toAbsolutePath());
MetadataStore store1 = MetadataStoreFactory.create("rocksdb://" + tempDir.toAbsolutePath(),
MetadataStoreConfig.builder().build());

MetadataStore store2 = MetadataStoreFactory.create("rocksdb://" + tempDir.toAbsolutePath(),
MetadataStoreConfig.builder().build());

// We should get the same instance
Assert.assertTrue(store1 == store2);

store1.put("/test", new byte[0], Optional.empty()).join();
Assert.assertTrue(store2.exists("/test").join());

store1.close();
store2.put("/test-2", new byte[0], Optional.empty()).join();
Assert.assertTrue(store2.exists("/test-2").join());

FileUtils.deleteQuietly(tempDir.toFile());
}
}

0 comments on commit 8075dbd

Please sign in to comment.