Skip to content

Commit

Permalink
Upgrade BookKeeper to 4.12.0 (apache#8447)
Browse files Browse the repository at this point in the history
Upgrade Apache BookKeeper to 4.12.0

Most notable changes that impact this patch are:
- BP-41 -> "BookieSocketAddress" becomes "BookieId"
- BP-42 -> LedgerMetadata now carries "ledgerId" (as a transient non serialized value)
  • Loading branch information
eolivelli authored Nov 16, 2020
1 parent f684cf0 commit 1907afe
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 178 deletions.
54 changes: 28 additions & 26 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ The Apache Software License, Version 2.0
- io.sundr-sundr-core-0.21.0.jar
* Guava
- com.google.guava-guava-25.1-jre.jar
- com.google.guava-failureaccess-1.0.1.jar
- com.google.guava-listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
* J2ObjC Annotations -- com.google.j2objc-j2objc-annotations-1.1.jar
* Netty Reactive Streams -- com.typesafe.netty-netty-reactive-streams-2.0.4.jar
* Swagger
Expand Down Expand Up @@ -392,32 +394,32 @@ The Apache Software License, Version 2.0
- org.apache.logging.log4j-log4j-web-2.10.0.jar
* Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
* BookKeeper
- org.apache.bookkeeper-bookkeeper-common-4.11.1.jar
- org.apache.bookkeeper-bookkeeper-common-allocator-4.11.1.jar
- org.apache.bookkeeper-bookkeeper-proto-4.11.1.jar
- org.apache.bookkeeper-bookkeeper-server-4.11.1.jar
- org.apache.bookkeeper-bookkeeper-tools-framework-4.11.1.jar
- org.apache.bookkeeper-circe-checksum-4.11.1.jar
- org.apache.bookkeeper-cpu-affinity-4.11.1.jar
- org.apache.bookkeeper-statelib-4.11.1.jar
- org.apache.bookkeeper-stream-storage-api-4.11.1.jar
- org.apache.bookkeeper-stream-storage-common-4.11.1.jar
- org.apache.bookkeeper-stream-storage-java-client-4.11.1.jar
- org.apache.bookkeeper-stream-storage-java-client-base-4.11.1.jar
- org.apache.bookkeeper-stream-storage-proto-4.11.1.jar
- org.apache.bookkeeper-stream-storage-server-4.11.1.jar
- org.apache.bookkeeper-stream-storage-service-api-4.11.1.jar
- org.apache.bookkeeper-stream-storage-service-impl-4.11.1.jar
- org.apache.bookkeeper.http-http-server-4.11.1.jar
- org.apache.bookkeeper.http-vertx-http-server-4.11.1.jar
- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.11.1.jar
- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.11.1.jar
- org.apache.bookkeeper.tests-stream-storage-tests-common-4.11.1.jar
- org.apache.distributedlog-distributedlog-common-4.11.1.jar
- org.apache.distributedlog-distributedlog-core-4.11.1-tests.jar
- org.apache.distributedlog-distributedlog-core-4.11.1.jar
- org.apache.distributedlog-distributedlog-protocol-4.11.1.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.11.1.jar
- org.apache.bookkeeper-bookkeeper-common-4.12.0.jar
- org.apache.bookkeeper-bookkeeper-common-allocator-4.12.0.jar
- org.apache.bookkeeper-bookkeeper-proto-4.12.0.jar
- org.apache.bookkeeper-bookkeeper-server-4.12.0.jar
- org.apache.bookkeeper-bookkeeper-tools-framework-4.12.0.jar
- org.apache.bookkeeper-circe-checksum-4.12.0.jar
- org.apache.bookkeeper-cpu-affinity-4.12.0.jar
- org.apache.bookkeeper-statelib-4.12.0.jar
- org.apache.bookkeeper-stream-storage-api-4.12.0.jar
- org.apache.bookkeeper-stream-storage-common-4.12.0.jar
- org.apache.bookkeeper-stream-storage-java-client-4.12.0.jar
- org.apache.bookkeeper-stream-storage-java-client-base-4.12.0.jar
- org.apache.bookkeeper-stream-storage-proto-4.12.0.jar
- org.apache.bookkeeper-stream-storage-server-4.12.0.jar
- org.apache.bookkeeper-stream-storage-service-api-4.12.0.jar
- org.apache.bookkeeper-stream-storage-service-impl-4.12.0.jar
- org.apache.bookkeeper.http-http-server-4.12.0.jar
- org.apache.bookkeeper.http-vertx-http-server-4.12.0.jar
- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.12.0.jar
- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.12.0.jar
- org.apache.bookkeeper.tests-stream-storage-tests-common-4.12.0.jar
- org.apache.distributedlog-distributedlog-common-4.12.0.jar
- org.apache.distributedlog-distributedlog-core-4.12.0-tests.jar
- org.apache.distributedlog-distributedlog-core-4.12.0.jar
- org.apache.distributedlog-distributedlog-protocol-4.12.0.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.12.0.jar
* Apache HTTP Client
- org.apache.httpcomponents-httpclient-4.5.5.jar
- org.apache.httpcomponents-httpcore-4.4.9.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats;

Expand Down Expand Up @@ -120,34 +121,35 @@ public static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
.setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue()));
}

for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getAllEnsembles().entrySet()) {
for (Map.Entry<Long, ? extends List<BookieId>> e : metadata.getAllEnsembles().entrySet()) {
builder.addSegmentBuilder()
.setFirstEntryId(e.getKey())
.addAllEnsembleMember(e.getValue().stream().map(BookieSocketAddress::toString).collect(Collectors.toList()));
.addAllEnsembleMember(e.getValue().stream().map(BookieId::toString).collect(Collectors.toList()));
}

return builder.build().toByteArray();
}

public static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException {
public static LedgerMetadata parseLedgerMetadata(long id, byte[] bytes) throws IOException {
DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build();
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
.withLastEntryId(ledgerMetadataFormat.getLastEntryId())
.withPassword(ledgerMetadataFormat.getPassword().toByteArray())
.withClosedState()
.withId(id)
.withMetadataFormatVersion(2)
.withLength(ledgerMetadataFormat.getLength())
.withAckQuorumSize(ledgerMetadataFormat.getAckQuorumSize())
.withCreationTime(ledgerMetadataFormat.getCtime())
.withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize())
.withEnsembleSize(ledgerMetadataFormat.getEnsembleSize());
ledgerMetadataFormat.getSegmentList().forEach(segment -> {
ArrayList<BookieSocketAddress> addressArrayList = new ArrayList<>();
ArrayList<BookieId> addressArrayList = new ArrayList<>();
segment.getEnsembleMemberList().forEach(address -> {
try {
addressArrayList.add(new BookieSocketAddress(address));
} catch (IOException e) {
log.error("Exception when create BookieSocketAddress. ", e);
addressArrayList.add(BookieId.parse(address));
} catch (IllegalArgumentException e) {
log.error("Exception when create BookieId {}. ", address, e);
}
});
builder.newEnsembleEntry(segment.getFirstEntryId(), addressArrayList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
Expand Down Expand Up @@ -254,8 +255,9 @@ static class MockMetadata implements LedgerMetadata {
private final State state;
private final byte[] password;
private final Map<String, byte[]> customMetadata;

private final long ledgerId;
MockMetadata(LedgerMetadata toCopy) {
ledgerId = toCopy.getLedgerId();
ensembleSize = toCopy.getEnsembleSize();
writeQuorumSize = toCopy.getWriteQuorumSize();
ackQuorumSize = toCopy.getAckQuorumSize();
Expand All @@ -270,6 +272,11 @@ static class MockMetadata implements LedgerMetadata {
customMetadata = ImmutableMap.copyOf(toCopy.getCustomMetadata());
}

@Override
public long getLedgerId() {
return ledgerId;
}

@Override
public boolean hasPassword() { return true; }

Expand Down Expand Up @@ -315,12 +322,12 @@ public long getCToken() {
public Map<String, byte[]> getCustomMetadata() { return customMetadata; }

@Override
public List<BookieSocketAddress> getEnsembleAt(long entryId) {
public List<BookieId> getEnsembleAt(long entryId) {
throw new UnsupportedOperationException("Pulsar shouldn't look at this");
}

@Override
public NavigableMap<Long, ? extends List<BookieSocketAddress>> getAllEnsembles() {
public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
throw new UnsupportedOperationException("Pulsar shouldn't look at this");
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ flexible messaging model and an intuitive client API.</description>
<!-- apache commons -->
<commons-compress.version>1.19</commons-compress.version>

<bookkeeper.version>4.11.1</bookkeeper.version>
<bookkeeper.version>4.12.0</bookkeeper.version>
<zookeeper.version>3.5.7</zookeeper.version>
<netty.version>4.1.51.Final</netty.version>
<netty-tc-native.version>2.0.30.Final</netty-tc-native.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.broker.ManagedLedgerClientFactory;
Expand Down Expand Up @@ -133,10 +133,10 @@ public void testBookieIsolation() throws Exception {
BookieServer[] bookies = bkEnsemble.getBookies();
ZooKeeper zkClient = bkEnsemble.getZkClient();

Set<BookieSocketAddress> defaultBookies = Sets.newHashSet(bookies[0].getLocalAddress(),
bookies[1].getLocalAddress());
Set<BookieSocketAddress> isolatedBookies = Sets.newHashSet(bookies[2].getLocalAddress(),
bookies[3].getLocalAddress());
Set<BookieId> defaultBookies = Sets.newHashSet(bookies[0].getBookieId(),
bookies[1].getBookieId());
Set<BookieId> isolatedBookies = Sets.newHashSet(bookies[2].getBookieId(),
bookies[3].getBookieId());

setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups, zkClient, defaultBookies);
setDefaultIsolationGroup(tenantNamespaceIsolationGroups, zkClient, isolatedBookies);
Expand Down Expand Up @@ -271,12 +271,12 @@ public void testBookieIsilationWithSecondaryGroup() throws Exception {
BookieServer[] bookies = bkEnsemble.getBookies();
ZooKeeper zkClient = bkEnsemble.getZkClient();

Set<BookieSocketAddress> defaultBookies = Sets.newHashSet(bookies[0].getLocalAddress(),
bookies[1].getLocalAddress());
Set<BookieSocketAddress> isolatedBookies = Sets.newHashSet(bookies[2].getLocalAddress(),
bookies[3].getLocalAddress());
Set<BookieSocketAddress> downedBookies = Sets.newHashSet(new BookieSocketAddress("1.1.1.1:1111"),
new BookieSocketAddress("1.1.1.1:1112"));
Set<BookieId> defaultBookies = Sets.newHashSet(bookies[0].getBookieId(),
bookies[1].getBookieId());
Set<BookieId> isolatedBookies = Sets.newHashSet(bookies[2].getBookieId(),
bookies[3].getBookieId());
Set<BookieId> downedBookies = Sets.newHashSet(BookieId.parse("1.1.1.1:1111"),
BookieId.parse("1.1.1.1:1112"));

setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups, zkClient, defaultBookies);
// primary group empty
Expand Down Expand Up @@ -395,10 +395,10 @@ public void testDeleteIsolationGroup() throws Exception {
BookieServer[] bookies = bkEnsemble.getBookies();
ZooKeeper zkClient = bkEnsemble.getZkClient();

Set<BookieSocketAddress> defaultBookies = Sets.newHashSet(bookies[0].getLocalAddress(),
bookies[1].getLocalAddress());
Set<BookieSocketAddress> isolatedBookies = Sets.newHashSet(bookies[2].getLocalAddress(),
bookies[3].getLocalAddress());
Set<BookieId> defaultBookies = Sets.newHashSet(bookies[0].getBookieId(),
bookies[1].getBookieId());
Set<BookieId> isolatedBookies = Sets.newHashSet(bookies[2].getBookieId(),
bookies[3].getBookieId());

setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups, zkClient, defaultBookies);
// primary group empty
Expand Down Expand Up @@ -460,12 +460,12 @@ public void testDeleteIsolationGroup() throws Exception {
}

private void assertAffinityBookies(LedgerManager ledgerManager, List<LedgerInfo> ledgers1,
Set<BookieSocketAddress> defaultBookies) throws Exception {
Set<BookieId> defaultBookies) throws Exception {
for (LedgerInfo lInfo : ledgers1) {
long ledgerId = lInfo.getLedgerId();
CompletableFuture<Versioned<LedgerMetadata>> ledgerMetaFuture = ledgerManager.readLedgerMetadata(ledgerId);
LedgerMetadata ledgerMetadata = ledgerMetaFuture.get().getValue();
Set<BookieSocketAddress> ledgerBookies = Sets.newHashSet();
Set<BookieId> ledgerBookies = Sets.newHashSet();
ledgerBookies.addAll(ledgerMetadata.getAllEnsembles().values().iterator().next());
assertEquals(ledgerBookies.size(), defaultBookies.size());
ledgerBookies.removeAll(defaultBookies);
Expand Down Expand Up @@ -493,7 +493,7 @@ private Topic createTopicAndPublish(PulsarClient pulsarClient, String ns, String
}

private void setDefaultIsolationGroup(String brokerBookkeeperClientIsolationGroups, ZooKeeper zkClient,
Set<BookieSocketAddress> bookieAddresses) throws Exception {
Set<BookieId> bookieAddresses) throws Exception {
BookiesRackConfiguration bookies = null;
try {
byte[] data = zkClient.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null);
Expand All @@ -509,8 +509,8 @@ private void setDefaultIsolationGroup(String brokerBookkeeperClientIsolationGrou
}

Map<String, BookieInfo> bookieInfoMap = Maps.newHashMap();
for (BookieSocketAddress bkSocket : bookieAddresses) {
BookieInfo info = new BookieInfo("use", bkSocket.getHostName() + ":" + bkSocket.getPort());
for (BookieId bkSocket : bookieAddresses) {
BookieInfo info = new BookieInfo("use", bkSocket.toString());
bookieInfoMap.put(bkSocket.toString(), info);
}
bookies.put(brokerBookkeeperClientIsolationGroups, bookieInfoMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.common.policies.data.BookieInfo;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void testPlacement() throws Exception {
BookKeeper bkc = this.pulsar.getBookKeeperClient();

// Create few ledgers and verify all of them should have a copy in the first bookie
BookieSocketAddress fistBookie = bookies.get(0).getLocalAddress();
BookieId fistBookie = bookies.get(0).getBookieId();
for (int i = 0; i < 100; i++) {
LedgerHandle lh = bkc.createLedger(2, 2, DigestType.DUMMY, new byte[0]);
log.info("Ledger: {} -- Ensemble: {}", i, lh.getLedgerMetadata().getEnsembleAt(0));
Expand All @@ -118,20 +118,5 @@ public void testPlacement() throws Exception {
}
}

@Test(enabled = false)
public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
// Ignore test
}

@Test(enabled = false)
public void testSkipCorruptDataLedger() throws Exception {
// Ignore test
}

@Test(enabled = false)
public void testTopicWithWildCardChar() throws Exception {
// Ignore test
}

private static final Logger log = LoggerFactory.getLogger(RackAwareTest.class);
}
23 changes: 12 additions & 11 deletions pulsar-sql/presto-distribution/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -408,17 +408,18 @@ The Apache Software License, Version 2.0
- async-http-client-2.12.1.jar
- async-http-client-netty-utils-2.12.1.jar
* Apache Bookkeeper
- bookkeeper-common-4.11.1.jar
- bookkeeper-common-allocator-4.11.1.jar
- bookkeeper-proto-4.11.1.jar
- bookkeeper-server-4.11.1.jar
- bookkeeper-stats-api-4.11.1.jar
- bookkeeper-tools-framework-4.11.1.jar
- circe-checksum-4.11.1.jar
- codahale-metrics-provider-4.11.1.jar
- cpu-affinity-4.11.1.jar
- http-server-4.11.1.jar
- prometheus-metrics-provider-4.11.1.jar
- bookkeeper-common-4.12.0.jar
- bookkeeper-common-allocator-4.12.0.jar
- bookkeeper-proto-4.12.0.jar
- bookkeeper-server-4.12.0.jar
- bookkeeper-stats-api-4.12.0.jar
- bookkeeper-tools-framework-4.12.0.jar
- circe-checksum-4.12.0.jar
- codahale-metrics-provider-4.12.0jar
- cpu-affinity-4.12.0.jar
- http-server-4.12.0.jar
- prometheus-metrics-provider-4.12.0.jar
- codahale-metrics-provider-4.12.0.jar
* Apache Commons
- commons-cli-1.2.jar
- commons-codec-1.10.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
Expand Down Expand Up @@ -218,14 +219,10 @@ public void reloadCachedMappings() {
public void onUpdate(String path, BookiesRackConfiguration data, Stat stat) {
if (rackawarePolicy != null) {
LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", data.toString());
List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
List<BookieId> bookieAddressList = new ArrayList<>();
for (Map<String, BookieInfo> bookieMapping : data.values()) {
for (String addr : bookieMapping.keySet()) {
try {
bookieAddressList.add(new BookieSocketAddress(addr));
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
bookieAddressList.add(BookieId.parse(addr));
}
}
rackawarePolicy.onBookieRackChange(bookieAddressList);
Expand Down
Loading

0 comments on commit 1907afe

Please sign in to comment.