Skip to content

Commit

Permalink
[managed-ledger] close bk-client factory gracefully (apache#4580)
Browse files Browse the repository at this point in the history
### Motivation
User can create tools on bookkeeper using ManagedLedger factory which provides [constructor](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L121) to create ml-factory using self-managed bookkeeper (it's not used by broker).
So, in case of self-managed bk-client, ML-Factory couldn't shutdown it gracefully and we see issue: apache#4573

### Modification
- ML-Factory creates `DefaultBkFactory` to create self-managed bk-client and shutdowns same bk-client while closing the resource.
  • Loading branch information
rdhabalia authored and sijie committed Jun 25, 2019
1 parent 95df092 commit 156682a
Showing 1 changed file with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import io.netty.util.concurrent.DefaultThreadFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -116,15 +117,8 @@ public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, Manag
}

private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration,
ManagedLedgerFactoryConfig config)
throws Exception {
this((policyConfig) -> {
try {
return new BookKeeper(bkClientConfiguration, zkc);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}, true /* isBookkeeperManaged */, zkc, config);
ManagedLedgerFactoryConfig config) throws Exception {
this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */, zkc, config);
}

public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
Expand Down Expand Up @@ -171,6 +165,21 @@ private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPoli
cacheEvictionExecutor.execute(this::cacheEvictionTask);
}

static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {

private final BookKeeper bkClient;

public DefaultBkFactory(ClientConfiguration bkClientConfiguration, ZooKeeper zkc)
throws BKException, IOException, InterruptedException {
bkClient = new BookKeeper(bkClientConfiguration, zkc);
}

@Override
public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
return bkClient;
}
}

private synchronized void refreshStats() {
long now = System.nanoTime();
long period = now - lastStatTimestamp;
Expand Down Expand Up @@ -428,9 +437,9 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {

if (isBookkeeperManaged) {
try {
BookKeeper bkFactory = bookkeeperFactory.get();
if (bkFactory != null) {
bkFactory.close();
BookKeeper bookkeeper = bookkeeperFactory.get();
if (bookkeeper != null) {
bookkeeper.close();
}
} catch (BKException e) {
throw new ManagedLedgerException(e);
Expand Down

0 comments on commit 156682a

Please sign in to comment.