Skip to content

Commit

Permalink
[Issue 3458: Tag Pulsar ledgers in order to distinguish from other le…
Browse files Browse the repository at this point in the history
…ggers in the same Bookkeeper cluster (apache#3525)

Fixes apache#3458

### Motivation

See apache#3458 

### Modifications

Add a new LedgerMetadataUtils class which holds the logic for building "metadata" to be attached to 

### Verifying this change

This change is a trivial rework / code cleanup without any test coverage.
  • Loading branch information
eolivelli authored and sijie committed Feb 13, 2019
1 parent 2ed8fea commit 4338052
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import com.google.common.collect.ImmutableMap;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* Utilities for managing BookKeeper Ledgers custom metadata.
*/
public final class LedgerMetadataUtils {

private static final String METADATA_PROPERTY_APPLICATION = "application";
private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR
= "pulsar".getBytes(StandardCharsets.UTF_8);

private static final String METADATA_PROPERTY_COMPONENT = "component";
private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER
= "managed-ledger".getBytes(StandardCharsets.UTF_8);
private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER
= "compacted-ledger".getBytes(StandardCharsets.UTF_8);
private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA
= "schema".getBytes(StandardCharsets.UTF_8);

private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger";
private static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor";
private static final String METADATA_PROPERTY_COMPACTEDTOPIC = "pulsar/compactedTopic";
private static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo";
private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";

/**
* Build base metadata for every ManagedLedger.
*
* @param name the name of the ledger
* @return an immutable map which describes a ManagedLedger
*/
static Map<String, byte[]> buildBaseManagedLedgerMetadata(String name) {
return ImmutableMap.of(
METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER,
METADATA_PROPERTY_MANAGED_LEDGER_NAME, name.getBytes(StandardCharsets.UTF_8));
}

/**
* Build additional metadata for a Cursor.
*
* @param name the name of the cursor
* @return an immutable map which describes the cursor
* @see #buildBaseManagedLedgerMetadata(java.lang.String)
*/
static Map<String, byte[]> buildAdditionalMetadataForCursor(String name) {
return ImmutableMap.of(METADATA_PROPERTY_CURSOR_NAME, name.getBytes(StandardCharsets.UTF_8));
}

/**
* Build additional metadata for a CompactedLedger.
*
* @param compactedTopic reference to the compacted topic.
* @param compactedToMessageId last mesasgeId.
* @return an immutable map which describes the compacted ledger
*/
public static Map<String, byte[]> buildMetadataForCompactedLedger(String compactedTopic, byte[] compactedToMessageId) {
return ImmutableMap.of(
METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER,
METADATA_PROPERTY_COMPACTEDTOPIC, compactedTopic.getBytes(StandardCharsets.UTF_8),
METADATA_PROPERTY_COMPACTEDTO, compactedToMessageId
);
}

/**
* Build additional metadata for a Schema
*
* @param schemaId id of the schema
* @return an immutable map which describes the schema
*/
public static Map<String, byte[]> buildMetadataForSchema(String schemaId) {
return ImmutableMap.of(
METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_SCHEMA,
METADATA_PROPERTY_SCHEMAID, schemaId.getBytes(StandardCharsets.UTF_8)
);
}

private LedgerMetadataUtils() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
Expand All @@ -37,6 +38,7 @@
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
import java.nio.charset.StandardCharsets;

import java.time.Clock;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -2015,7 +2017,6 @@ void internalFlushPendingMarkDeletes() {

void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();

ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {

if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
Expand Down Expand Up @@ -2081,7 +2082,7 @@ public void deleteComplete(int rc, Object ctx) {
}
});
}));
}, Collections.emptyMap());
}, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import java.time.Clock;
Expand Down Expand Up @@ -106,7 +105,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -119,6 +117,8 @@
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;

Expand All @@ -131,6 +131,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

protected final BookKeeper bookKeeper;
protected final String name;
private final Map<String, byte[]> ledgerMetadata;
private final BookKeeper.DigestType digestType;

protected ManagedLedgerConfig config;
Expand Down Expand Up @@ -249,6 +250,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.config = config;
this.store = store;
this.name = name;
this.ledgerMetadata = LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
this.scheduledExecutor = scheduledExecutor;
this.executor = orderedExecutor;
Expand Down Expand Up @@ -439,7 +441,7 @@ public void operationFailed(MetaStoreException e) {
// Save it back to ensure all nodes exist
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
}));
}, Collections.emptyMap());
}, ledgerMetadata);
}

private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) {
Expand Down Expand Up @@ -3013,13 +3015,19 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) {
* @param config
* @param digestType
* @param cb
* @param emptyMap
* @param metadata
*/
protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType,
CreateCallback cb, Map<Object, Object> emptyMap) {
CreateCallback cb, Map<String, byte[]> metadata) {
AtomicBoolean ledgerCreated = new AtomicBoolean(false);
Map<String, byte[]> finalMetadata = new HashMap<>();
finalMetadata.putAll(ledgerMetadata);
finalMetadata.putAll(metadata);
if (log.isDebugEnabled()) {
log.debug("creating ledger, metadata: "+finalMetadata);
}
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
digestType, config.getPassword(), cb, ledgerCreated, Collections.emptyMap());
digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
scheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {
ledgerCreated.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -45,6 +45,7 @@
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -210,7 +211,7 @@ private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] h
return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
if (optLocatorEntry.isPresent()) {
// Schema locator was already present
return addNewSchemaEntryToStore(optLocatorEntry.get().locator.getIndexList(), data)
return addNewSchemaEntryToStore(schemaId, optLocatorEntry.get().locator.getIndexList(), data)
.thenCompose(position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash));
} else {
// No schema was defined yet
Expand Down Expand Up @@ -259,7 +260,7 @@ private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] data,

return findSchemaEntryByHash(locator.getIndexList(), hash).thenCompose(version -> {
if (isNull(version)) {
return addNewSchemaEntryToStore(locator.getIndexList(), data).thenCompose(
return addNewSchemaEntryToStore(schemaId, locator.getIndexList(), data).thenCompose(
position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash));
} else {
return completedFuture(version);
Expand Down Expand Up @@ -303,7 +304,7 @@ private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, by
.setLedgerId(-1L)
).build();

return addNewSchemaEntryToStore(Collections.singletonList(emptyIndex), data).thenCompose(position -> {
return addNewSchemaEntryToStore(schemaId, Collections.singletonList(emptyIndex), data).thenCompose(position -> {
// The schema was stored in the ledger, now update the z-node with the pointer to it
SchemaStorageFormat.IndexEntry info = SchemaStorageFormat.IndexEntry.newBuilder()
.setVersion(0)
Expand Down Expand Up @@ -338,11 +339,12 @@ private static String getSchemaPath(String schemaId) {

@NotNull
private CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToStore(
String schemaId,
List<SchemaStorageFormat.IndexEntry> index,
byte[] data
) {
SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data);
return createLedger().thenCompose(ledgerHandle ->
return createLedger(schemaId).thenCompose(ledgerHandle ->
addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
Functions.newPositionInfo(ledgerHandle.getId(), entryId)
)
Expand Down Expand Up @@ -497,7 +499,8 @@ private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorag
}

@NotNull
private CompletableFuture<LedgerHandle> createLedger() {
private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForSchema(schemaId);
final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
bookKeeper.asyncCreateLedger(
config.getManagedLedgerDefaultEnsembleSize(),
Expand All @@ -511,7 +514,7 @@ private CompletableFuture<LedgerHandle> createLedger() {
} else {
future.complete(handle);
}
}, null, Collections.emptyMap()
}, null, metadata
);
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -129,7 +130,7 @@ private void phaseOneLoop(RawReader reader,
Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
if (keyAndSize != null) {
if(keyAndSize.getRight() > 0) {
latestForKey.put(keyAndSize.getLeft(), id);
latestForKey.put(keyAndSize.getLeft(), id);
} else {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
Expand Down Expand Up @@ -165,8 +166,7 @@ private void scheduleTimeout(CompletableFuture<RawMessage> future) {

private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
Map<String, MessageId> latestForKey, BookKeeper bk) {
Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", reader.getTopic().getBytes(UTF_8),
"compactedTo", to.toByteArray());
Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(), to.toByteArray());
return createLedger(bk, metadata).thenCompose((ledger) -> {
log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
Expand Down Expand Up @@ -228,7 +228,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
promise.completeExceptionally(new IllegalArgumentException(
Expand Down
20 changes: 20 additions & 0 deletions site2/docs/cookbooks-bookkeepermetadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
id: cookbooks-bookkeepermetadata
title: BookKeeper Ledger Metadata
---

Pulsar stores data on BookKeeper ledgers, you can understand the contents of a ledger by inspecting the metadata attached to the ledger.
Such metadata are stored on ZooKeeper and they are readable using BookKeeper APIs.

Description of current metadata:

| Scope | Metadata name | Metadata value |
| ------------- | ------------- | ------------- |
| All ledgers | application | 'pulsar' |
| All ledgers | component | 'managed-ledger', 'schema', 'compacted-topic' |
| Managed ledgers | pulsar/managed-ledger | name of the ledger |
| Cursor | pulsar/cursor | name of the cursor |
| Compacted topic | pulsar/compactedTopic | name of the original topic |
| Compacted topic | pulsar/compactedTo | id of the last compacted message |


3 changes: 2 additions & 1 deletion site2/website/sidebars.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@
"cookbooks-partitioned",
"cookbooks-retention-expiry",
"cookbooks-encryption",
"cookbooks-message-queue"
"cookbooks-message-queue",
"cookbooks-bookkeepermetadata"
],
"Development": [
"develop-tools",
Expand Down

0 comments on commit 4338052

Please sign in to comment.