Skip to content

Commit

Permalink
[ML] Make ManagedLedger storage configurable (apache#9397)
Browse files Browse the repository at this point in the history
*Motivation*

This is the first step to allow supporting different storage implementations for Pulsar
  • Loading branch information
sijie authored Feb 5, 2021
1 parent 6f9b795 commit ddc3813
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "if allowAutoTopicCreationType is partitioned."
)
private int defaultNumPartitions = 1;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "The class of the managed ledger storage"
)
private String managedLedgerStorageClassName = "org.apache.pulsar.broker.ManagedLedgerClientFactory";
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Number of threads to be used for managed ledger tasks dispatching"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
Expand All @@ -36,23 +35,24 @@
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedLedgerClientFactory implements Closeable {
public class ManagedLedgerClientFactory implements ManagedLedgerStorage {

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);

private final ManagedLedgerFactory managedLedgerFactory;
private final BookKeeper defaultBkClient;
private ManagedLedgerFactory managedLedgerFactory;
private BookKeeper defaultBkClient;
private final Map<EnsemblePlacementPolicyConfig, BookKeeper>
bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
private StatsProvider statsProvider = new NullStatsProvider();

public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
BookKeeperClientFactory bookkeeperProvider) throws Exception {
public void initialize(ServiceConfiguration conf, ZooKeeper zkClient,
BookKeeperClientFactory bookkeeperProvider) throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
Expand Down Expand Up @@ -125,12 +125,18 @@ public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookK
@Override
public void close() throws IOException {
try {
managedLedgerFactory.shutdown();
log.info("Closed managed ledger factory");
if (null != managedLedgerFactory) {
managedLedgerFactory.shutdown();
log.info("Closed managed ledger factory");
}

statsProvider.stop();
if (null != statsProvider) {
statsProvider.stop();
}
try {
defaultBkClient.close();
if (null != defaultBkClient) {
defaultBkClient.close();
}
} catch (RejectedExecutionException ree) {
// when closing bookkeeper client, it will error outs all pending metadata operations.
// those callbacks of those operations will be triggered, and submitted to the scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
Expand Down Expand Up @@ -158,7 +159,7 @@ public class PulsarService implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
private ServiceConfiguration config = null;
private NamespaceService nsService = null;
private ManagedLedgerClientFactory managedLedgerClientFactory = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
private LeaderElectionService leaderElectionService = null;
private BrokerService brokerService = null;
private WebService webService = null;
Expand Down Expand Up @@ -516,7 +517,9 @@ public void start() throws PulsarServerException {
this.startZkCacheService();

this.bkClientFactory = newBookKeeperClientFactory();
managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);
managedLedgerClientFactory = ManagedLedgerStorage.create(
config, getZkClient(), bkClientFactory
);

this.brokerService = new BrokerService(this);

Expand Down Expand Up @@ -975,7 +978,7 @@ public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerClientFactory.getManagedLedgerFactory();
}

public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
public ManagedLedgerStorage getManagedLedgerClientFactory() {
return managedLedgerClientFactory;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.storage;

import java.io.IOException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.classification.InterfaceAudience.Private;
import org.apache.pulsar.common.classification.InterfaceStability.Unstable;
import org.apache.zookeeper.ZooKeeper;

/**
* Storage to access {@link org.apache.bookkeeper.mledger.ManagedLedger}s.
*/
@Private
@Unstable
public interface ManagedLedgerStorage extends AutoCloseable {

/**
* Initialize the managed ledger storage.
*
* @param conf service config
* @param zkClient zk client
* @param bookkeperProvider bookkeeper provider
* @throws Exception
*/
void initialize(ServiceConfiguration conf,
ZooKeeper zkClient,
BookKeeperClientFactory bookkeperProvider) throws Exception;

/**
* Return the factory to create {@link ManagedLedgerFactory}.
*
* @return the factory to create {@link ManagedLedgerFactory}.
*/
ManagedLedgerFactory getManagedLedgerFactory();

/**
* Return the stats provider to expose the stats of the storage implementation.
*
* @return the stats provider.
*/
StatsProvider getStatsProvider();

/**
* Return the default bookkeeper client.
*
* @return the default bookkeeper client.
*/
BookKeeper getBookKeeperClient();

/**
* Close the storage.
*
* @throws IOException
*/
void close() throws IOException;

/**
* Initialize the {@link ManagedLedgerStorage} from the provided resources.
*
* @param conf service config
* @param zkClient zookeeper client
* @param bkProvider bookkeeper client provider
* @return the initialized managed ledger storage.
*/
static ManagedLedgerStorage create(ServiceConfiguration conf,
ZooKeeper zkClient,
BookKeeperClientFactory bkProvider) throws Exception {
final Class<?> storageClass = Class.forName(conf.getManagedLedgerStorageClassName());
final ManagedLedgerStorage storage = (ManagedLedgerStorage) storageClass.newInstance();
storage.initialize(conf, zkClient, bkProvider);
return storage;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* The storage layer for Apache Pulsar.
*/
package org.apache.pulsar.broker.storage;
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ public void testBookieIsolation() throws Exception {
// validate ledgers' ensemble with affinity bookies
assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);

ManagedLedgerClientFactory mlFactory = pulsarService.getManagedLedgerClientFactory();
ManagedLedgerClientFactory mlFactory =
(ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
Map<EnsemblePlacementPolicyConfig, BookKeeper> bkPlacementPolicyToBkClientMap = mlFactory
.getBkEnsemblePolicyToBookKeeperMap();

Expand Down Expand Up @@ -364,7 +365,8 @@ public void testBookieIsilationWithSecondaryGroup() throws Exception {
// validate ledgers' ensemble with affinity bookies
assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);

ManagedLedgerClientFactory mlFactory = pulsarService.getManagedLedgerClientFactory();
ManagedLedgerClientFactory mlFactory =
(ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
Map<EnsemblePlacementPolicyConfig, BookKeeper> bkPlacementPolicyToBkClientMap = mlFactory
.getBkEnsemblePolicyToBookKeeperMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public void testCheckSequenceId() throws Exception {

// Fence the topic by opening the ManagedLedger for the topic outside the Pulsar broker. This will cause the
// broker to fail subsequent send operation and it will trigger a recover
ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory(pulsar.getConfiguration(),
pulsar.getZkClient(), pulsar.getBookKeeperClientFactory());
ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory();
clientFactory.initialize(pulsar.getConfiguration(), pulsar.getZkClient(), pulsar.getBookKeeperClientFactory());
ManagedLedgerFactory mlFactory = clientFactory.getManagedLedgerFactory();
ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
ml.close();
Expand Down

0 comments on commit ddc3813

Please sign in to comment.