Skip to content

Commit

Permalink
PIP-45: Initialize ManagedLedgerFactory with MetadataStore (apache#10647
Browse files Browse the repository at this point in the history
)

* PIP-45: Initialize ManagedLedgerFactory with MetadataStore

* Fixed using the correct metadatastore in PulsarClusterMetadataTeardown

* Fixed test

* Removed multiple test invocations

* Fixed package manager tests implementation
  • Loading branch information
merlimat authored May 22, 2021
1 parent e486492 commit d94c3f0
Show file tree
Hide file tree
Showing 46 changed files with 775 additions and 663 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.bookkeeper.mledger.impl;

import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;

import java.util.concurrent.atomic.LongAdder;

public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {

private final LongAdder persistLedgeSucceed = new LongAdder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -73,24 +72,19 @@
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final MetaStore store;
private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookkeeperFactory;
private final boolean isBookkeeperManaged;
private final ZooKeeper zookeeper;
private final ManagedLedgerFactoryConfig config;
protected final OrderedScheduler scheduledExecutor;
private final OrderedExecutor orderedExecutor;

private final ExecutorService cacheEvictionExecutor;

Expand Down Expand Up @@ -122,82 +116,68 @@ private static class PendingInitializeManagedLedger {

}

public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, String zkConnection) throws Exception {
this(bkClientConfiguration, zkConnection, new ManagedLedgerFactoryConfig());
public ManagedLedgerFactoryImpl(MetadataStore metadataStore, ClientConfiguration bkClientConfiguration)
throws Exception {
this(metadataStore, bkClientConfiguration, new ManagedLedgerFactoryConfig());
}

@SuppressWarnings("deprecation")
public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, ManagedLedgerFactoryConfig config)
public ManagedLedgerFactoryImpl(MetadataStore metadataStore, ClientConfiguration bkClientConfiguration,
ManagedLedgerFactoryConfig config)
throws Exception {
this(ZooKeeperClient.newBuilder()
.connectString(bkClientConfiguration.getZkServers())
.sessionTimeoutMs(bkClientConfiguration.getZkTimeout())
.build(), bkClientConfiguration, config);
}

private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration,
ManagedLedgerFactoryConfig config) throws Exception {
this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */,
zkc, config, NullStatsLogger.INSTANCE);
this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE);
}

public ManagedLedgerFactoryImpl(ClientConfiguration clientConfiguration, String zkConnection, ManagedLedgerFactoryConfig config) throws Exception {
this(new DefaultBkFactory(clientConfiguration),
true,
ZooKeeperClient.newBuilder()
.connectString(zkConnection)
.sessionTimeoutMs(clientConfiguration.getZkTimeout()).build(), config, NullStatsLogger.INSTANCE);
}

public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
this((policyConfig) -> bookKeeper, zooKeeper, new ManagedLedgerFactoryConfig());
public ManagedLedgerFactoryImpl(MetadataStore metadataStore, BookKeeper bookKeeper)
throws Exception {
this(metadataStore, bookKeeper, new ManagedLedgerFactoryConfig());
}

public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
public ManagedLedgerFactoryImpl(MetadataStore metadataStore, BookKeeper bookKeeper,
ManagedLedgerFactoryConfig config)
throws Exception {
this((policyConfig) -> bookKeeper, false /* isBookkeeperManaged */,
zooKeeper, config, NullStatsLogger.INSTANCE);
this(metadataStore, (policyConfig) -> bookKeeper, config);
}

public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
public ManagedLedgerFactoryImpl(MetadataStore metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
ManagedLedgerFactoryConfig config)
throws Exception {
this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config, NullStatsLogger.INSTANCE);
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
config, NullStatsLogger.INSTANCE);
}

public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
public ManagedLedgerFactoryImpl(MetadataStore metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
throws Exception {
this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config, statsLogger);
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
config, statsLogger);
}

private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
boolean isBookkeeperManaged, ZooKeeper zooKeeper,
private ManagedLedgerFactoryImpl(MetadataStore metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
boolean isBookkeeperManaged,
ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception {
scheduledExecutor = OrderedScheduler.newSchedulerBuilder()
.numThreads(config.getNumManagedLedgerSchedulerThreads())
.statsLogger(statsLogger)
.traceTaskExecution(config.isTraceTaskExecution())
.name("bookkeeper-ml-scheduler")
.build();
orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumManagedLedgerWorkerThreads())
.statsLogger(statsLogger)
.traceTaskExecution(config.isTraceTaskExecution())
.name("bookkeeper-ml-workers")
.build();
cacheEvictionExecutor = Executors
.newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));

this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
this.zookeeper = isBookkeeperManaged ? zooKeeper : null;
this.metadataStore = new ZKMetadataStore(zooKeeper);
this.store = new MetaStoreImpl(metadataStore, orderedExecutor);
this.metadataStore = metadataStore;
this.store = new MetaStoreImpl(metadataStore, scheduledExecutor);
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats, 0, StatsPeriodSeconds, TimeUnit.SECONDS);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats,
0, StatsPeriodSeconds, TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(this::flushCursors,
config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);

Expand All @@ -213,12 +193,8 @@ static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlace

private final BookKeeper bkClient;

public DefaultBkFactory(ClientConfiguration bkClientConfiguration, ZooKeeper zkc)
throws BKException, IOException, InterruptedException {
bkClient = new BookKeeper(bkClientConfiguration, zkc);
}

public DefaultBkFactory(ClientConfiguration bkClientConfiguration) throws InterruptedException, BKException, IOException {
public DefaultBkFactory(ClientConfiguration bkClientConfiguration)
throws InterruptedException, BKException, IOException {
bkClient = new BookKeeper(bkClientConfiguration);
}

Expand Down Expand Up @@ -350,8 +326,8 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
ManagedLedgerImpl l = existingFuture.get();
if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) {
// Managed ledger is in unusable state. Recreate it.
log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name,
l.getState());
log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it",
name, l.getState());
ledgers.remove(name, existingFuture);
}
} catch (Exception e) {
Expand Down Expand Up @@ -381,8 +357,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
bookkeeperFactory.get(
new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor,
orderedExecutor, name, mlOwnershipChecker);
store, config, scheduledExecutor, name, mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
Expand Down Expand Up @@ -429,7 +404,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {


@Override
public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config)
public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPosition,
ManagedLedgerConfig config)
throws InterruptedException, ManagedLedgerException {
class Result {
ReadOnlyCursor c = null;
Expand Down Expand Up @@ -467,9 +443,11 @@ public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosi
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, orderedExecutor, managedLedgerName);
store, config, scheduledExecutor, managedLedgerName);

roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition).thenAccept(roCursor -> callback.openReadOnlyCursorComplete(roCursor, ctx)).exceptionally(ex -> {
roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition)
.thenAccept(roCursor -> callback.openReadOnlyCursorComplete(roCursor, ctx))
.exceptionally(ex -> {
Throwable t = ex;
if (t instanceof CompletionException) {
t = ex.getCause();
Expand Down Expand Up @@ -526,10 +504,6 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
latch.await();
log.info("{} ledgers closed", numLedgers);

if (zookeeper != null) {
zookeeper.close();
}

if (isBookkeeperManaged) {
try {
BookKeeper bookkeeper = bookkeeperFactory.get();
Expand All @@ -542,15 +516,9 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}

scheduledExecutor.shutdownNow();
orderedExecutor.shutdownNow();
cacheEvictionExecutor.shutdownNow();

entryCacheManager.clear();
try {
metadataStore.close();
} catch (Exception e) {
throw new ManagedLedgerException(e);
}
}

@Override
Expand Down Expand Up @@ -748,7 +716,7 @@ public void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx)
}

/**
* Delete all managed ledger resources and metadata
* Delete all managed ledger resources and metadata.
*/
void deleteManagedLedger(String managedLedgerName, DeleteLedgerCallback callback, Object ctx) {
// Read the managed ledger metadata from store
Expand Down Expand Up @@ -801,7 +769,8 @@ public void operationFailed(MetaStoreException e) {
});
}

private CompletableFuture<Void> deleteCursor(BookKeeper bkc, String managedLedgerName, String cursorName, CursorInfo cursor) {
private CompletableFuture<Void> deleteCursor(BookKeeper bkc, String managedLedgerName, String cursorName,
CursorInfo cursor) {
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<Void> cursorLedgerDeleteFuture;

Expand Down Expand Up @@ -852,7 +821,7 @@ public BookKeeper getBookKeeper() {
}

/**
* Factory to create Bookkeeper-client for a given ensemblePlacementPolicy
* Factory to create Bookkeeper-client for a given ensemblePlacementPolicy.
*
*/
public interface BookkeeperFactoryForCustomEnsemblePlacementPolicy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -275,12 +274,12 @@ public enum PositionBound {
Map<String, byte[]> createdLedgerCustomMetadata;

public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
final String name) {
this(factory, bookKeeper, store, config, scheduledExecutor, orderedExecutor, name, null);
this(factory, bookKeeper, store, config, scheduledExecutor, name, null);
}
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
final String name, final Supplier<Boolean> mlOwnershipChecker) {
this.factory = factory;
this.bookKeeper = bookKeeper;
Expand All @@ -290,7 +289,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.ledgerMetadata = LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
this.scheduledExecutor = scheduledExecutor;
this.executor = orderedExecutor;
this.executor = bookKeeper.getMainWorkerPool();
TOTAL_SIZE_UPDATER.set(this, 0);
NUMBER_OF_ENTRIES_UPDATER.set(this, 0);
ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {

public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
String name) {
super(factory, bookKeeper, store, config, scheduledExecutor, orderedExecutor, name);
super(factory, bookKeeper, store, config, scheduledExecutor, name);
}

CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
Expand Down
Loading

0 comments on commit d94c3f0

Please sign in to comment.