Skip to content

Commit

Permalink
Switched to BookKeeper-4.7.0-SNAPSHOT + Adjustements (apache#1292)
Browse files Browse the repository at this point in the history
* Switched to BookKeeper-4.7.0-SNAPSHOT + Adjustements

* Added some missing operations to GrowableArrayBlockingQueue

Managed ledger was using UnboundedArrayBlockingQueue from yahoo/bookkeeper branch.

* Removed mock BK classes that are now in BK itself

* Fixed flaky test after refactoring

* Fixed some other tests

* One more flaky test

* Added bk tests dependency to pulsar-client-tools
  • Loading branch information
merlimat authored Feb 28, 2018
1 parent 3e1465a commit 6bd4ccc
Show file tree
Hide file tree
Showing 52 changed files with 640 additions and 959 deletions.
10 changes: 0 additions & 10 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,6 @@
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper.stats</groupId>
<artifactId>datasketches-metrics-provider</artifactId>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId></exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper.stats</groupId>
<artifactId>prometheus-metrics-provider</artifactId>
Expand Down
5 changes: 2 additions & 3 deletions buildtools/src/main/resources/log4j2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ Configuration:
Loggers:

Root:
level: info
level: warn
AppenderRef:
- ref: Console

Logger:
name: org.apache.bookkeeper.mledger
name: org.apache.pulsar
level: info
additivity: false
16 changes: 15 additions & 1 deletion managed-ledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<dependencies>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<artifactId>bookkeeper-server-shaded</artifactId>
<version>${bookkeeper.version}</version>
<exclusions>
<exclusion>
Expand All @@ -53,6 +53,20 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<version>${bookkeeper.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

// Read the last entry in the ledger
lh.asyncReadLastEntry((rc1, lh1, seq, ctx1) -> {
long lastEntryInLedger = lh.getLastAddConfirmed();
lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
Expand Down Expand Up @@ -1913,6 +1914,7 @@ void internalFlushPendingMarkDeletes() {

void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();

bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(),
config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
ledger.getExecutor().submit(safeRun(() -> {
Expand Down Expand Up @@ -1974,7 +1976,7 @@ public void deleteComplete(int rc, Object ctx) {
}
});
}));
}, null);
}, null, Collections.emptyMap());
}

private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
Expand Down Expand Up @@ -78,7 +79,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ManagedLedgerFactoryConfig config;
protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(16,
new DefaultThreadFactory("bookkeeper-ml"));
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(16, "bookkeeper-ml-workers");
private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(16)
.name("bookkeeper-ml-workers").build();

protected final ManagedLedgerFactoryMBeanImpl mbean;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,9 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -47,11 +39,13 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
Expand Down Expand Up @@ -82,14 +76,23 @@
import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.UnboundArrayBlockingQueue;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;

Expand Down Expand Up @@ -187,20 +190,20 @@ enum PositionBound {
private volatile State state = null;

private final ScheduledExecutorService scheduledExecutor;
private final OrderedSafeExecutor executor;
private final OrderedScheduler executor;
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;

/**
* Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
* created asynchronously and hence there is no ready ledger to write into.
*/
final Queue<OpAddEntry> pendingAddEntries = new UnboundArrayBlockingQueue<>();
final GrowableArrayBlockingQueue<OpAddEntry> pendingAddEntries = new GrowableArrayBlockingQueue<>();

// //////////////////////////////////////////////////////////////////////

public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, ScheduledExecutorService scheduledExecutor, OrderedSafeExecutor orderedExecutor,
ManagedLedgerConfig config, ScheduledExecutorService scheduledExecutor, OrderedScheduler orderedExecutor,
final String name) {
this.factory = factory;
this.bookKeeper = bookKeeper;
Expand Down Expand Up @@ -352,7 +355,7 @@ public void operationFailed(MetaStoreException e) {
// Save it back to ensure all nodes exist
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
}));
}, null);
}, null, Collections.emptyMap());
}

private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) {
Expand Down Expand Up @@ -512,7 +515,8 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx);
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx,
Collections.emptyMap());
}
} else {
checkArgument(state == State.LedgerOpened, "ledger=%s is not opened", state);
Expand Down Expand Up @@ -1168,7 +1172,7 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
}

// Process all the pending addEntry requests
for (OpAddEntry op : pendingAddEntries) {
for (OpAddEntry op : pendingAddEntries.toList()) {
op.setLedger(currentLedger);
++currentLedgerEntries;
currentLedgerSize += op.data.readableBytes();
Expand Down Expand Up @@ -1238,7 +1242,8 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null);
config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null,
Collections.emptyMap());
}
}

Expand Down Expand Up @@ -2087,7 +2092,7 @@ ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}

OrderedSafeExecutor getExecutor() {
OrderedScheduler getExecutor() {
return executor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@

import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.google.common.base.Charsets;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
Expand All @@ -43,6 +40,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;

@SuppressWarnings("checkstyle:javadoctype")
public class MetaStoreImplZookeeper implements MetaStore {

Expand All @@ -53,7 +55,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
private static final String prefix = prefixName + "/";

private final ZooKeeper zk;
private final OrderedSafeExecutor executor;
private final OrderedScheduler executor;

private static class ZKStat implements Stat {
private final int version;
Expand Down Expand Up @@ -88,7 +90,7 @@ public long getModificationTimestamp() {
}
}

public MetaStoreImplZookeeper(ZooKeeper zk, OrderedSafeExecutor executor)
public MetaStoreImplZookeeper(ZooKeeper zk, OrderedScheduler executor)
throws Exception {
this.zk = zk;
this.executor = executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* Test BookKeeperClient which allows access to members we don't wish to expose in the public API.
*/
public class BookKeeperTestClient extends BookKeeper {
public BookKeeperTestClient(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException {
public BookKeeperTestClient(ClientConfiguration conf) throws IOException, InterruptedException, BKException {
super(conf);
}

Expand All @@ -45,7 +45,7 @@ public ClientConfiguration getConf() {
* @throws InterruptedException
* @throws KeeperException
*/
public void readBookiesBlocking() throws InterruptedException, KeeperException {
bookieWatcher.readBookiesBlocking();
public void readBookiesBlocking() throws InterruptedException, BKException {
bookieWatcher.initialBlockingBookieRead();
}
}
Loading

0 comments on commit 6bd4ccc

Please sign in to comment.