Skip to content

Commit

Permalink
Extract common method in tiered storage to managed-ledger module (apa…
Browse files Browse the repository at this point in the history
…che#6533)

Move common ledger metadata SerDe methods to managed-ledger.
  • Loading branch information
yjshen authored Mar 15, 2020
1 parent 60e9adb commit f0880f2
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,27 @@
package org.apache.bookkeeper.mledger.offload;

import com.google.common.collect.Maps;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats;

@Slf4j
public final class OffloadUtils {

private OffloadUtils() {}
Expand Down Expand Up @@ -87,4 +102,81 @@ public static void setOffloadDriverMetadata(LedgerInfo.Builder infoBuilder,
.setValue(v)
.build()));
}

public static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
DataFormats.LedgerMetadataFormat.Builder builder = DataFormats.LedgerMetadataFormat.newBuilder();
builder.setQuorumSize(metadata.getWriteQuorumSize())
.setAckQuorumSize(metadata.getAckQuorumSize())
.setEnsembleSize(metadata.getEnsembleSize())
.setLength(metadata.getLength())
.setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED : DataFormats.LedgerMetadataFormat.State.OPEN)
.setLastEntryId(metadata.getLastEntryId())
.setCtime(metadata.getCtime())
.setDigestType(BookKeeper.DigestType.toProtoDigestType(
BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType())));

for (Map.Entry<String, byte[]> e : metadata.getCustomMetadata().entrySet()) {
builder.addCustomMetadataBuilder()
.setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue()));
}

for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getAllEnsembles().entrySet()) {
builder.addSegmentBuilder()
.setFirstEntryId(e.getKey())
.addAllEnsembleMember(e.getValue().stream().map(a -> a.toString()).collect(Collectors.toList()));
}

return builder.build().toByteArray();
}

public static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException {
DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build();
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
.withLastEntryId(ledgerMetadataFormat.getLastEntryId())
.withPassword(ledgerMetadataFormat.getPassword().toByteArray())
.withClosedState()
.withMetadataFormatVersion(2)
.withLength(ledgerMetadataFormat.getLength())
.withAckQuorumSize(ledgerMetadataFormat.getAckQuorumSize())
.withCreationTime(ledgerMetadataFormat.getCtime())
.withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize())
.withEnsembleSize(ledgerMetadataFormat.getEnsembleSize());
ledgerMetadataFormat.getSegmentList().forEach(segment -> {
ArrayList<BookieSocketAddress> addressArrayList = new ArrayList<>();
segment.getEnsembleMemberList().forEach(address -> {
try {
addressArrayList.add(new BookieSocketAddress(address));
} catch (IOException e) {
log.error("Exception when create BookieSocketAddress. ", e);
}
});
builder.newEnsembleEntry(segment.getFirstEntryId(), addressArrayList);
});

if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
Map<String, byte[]> customMetadata = Maps.newHashMap();
ledgerMetadataFormat.getCustomMetadataList().forEach(
entry -> customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
builder.withCustomMetadata(customMetadata);
}

switch (ledgerMetadataFormat.getDigestType()) {
case HMAC:
builder.withDigestType(DigestType.MAC);
break;
case CRC32:
builder.withDigestType(DigestType.CRC32);
break;
case CRC32C:
builder.withDigestType(DigestType.CRC32C);
break;
case DUMMY:
builder.withDigestType(DigestType.DUMMY);
break;
default:
throw new IllegalArgumentException("Unable to convert digest type " + ledgerMetadataFormat.getDigestType());
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
*/
package org.apache.bookkeeper.mledger.offload.filesystem.impl;

import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
Expand All @@ -32,8 +29,6 @@
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;

import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
Expand All @@ -43,11 +38,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.bookkeeper.mledger.offload.OffloadUtils.parseLedgerMetadata;

public class FileStoreBackedReadHandleImpl implements ReadHandle {
private static final Logger log = LoggerFactory.getLogger(FileStoreBackedReadHandleImpl.class);
private final ExecutorService executor;
Expand Down Expand Up @@ -184,56 +180,4 @@ public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsyn
public static ReadHandle open(ScheduledExecutorService executor, MapFile.Reader reader, long ledgerId) throws IOException {
return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId);
}

private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException {
DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build();
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
.withLastEntryId(ledgerMetadataFormat.getLastEntryId())
.withPassword(ledgerMetadataFormat.getPassword().toByteArray())
.withClosedState()
.withMetadataFormatVersion(2)
.withLength(ledgerMetadataFormat.getLength())
.withAckQuorumSize(ledgerMetadataFormat.getAckQuorumSize())
.withCreationTime(ledgerMetadataFormat.getCtime())
.withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize())
.withEnsembleSize(ledgerMetadataFormat.getEnsembleSize());
ledgerMetadataFormat.getSegmentList().forEach(segment -> {
ArrayList<BookieSocketAddress> addressArrayList = new ArrayList<>();
segment.getEnsembleMemberList().forEach(address -> {
try {
addressArrayList.add(new BookieSocketAddress(address));
} catch (IOException e) {
log.error("Exception when create BookieSocketAddress. ", e);
}
});
builder.newEnsembleEntry(segment.getFirstEntryId(), addressArrayList);
});

if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
Map<String, byte[]> customMetadata = Maps.newHashMap();
ledgerMetadataFormat.getCustomMetadataList().forEach(
entry -> customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
builder.withCustomMetadata(customMetadata);
}

switch (ledgerMetadataFormat.getDigestType()) {
case HMAC:
builder.withDigestType(DigestType.MAC);
break;
case CRC32:
builder.withDigestType(DigestType.CRC32);
break;
case CRC32C:
builder.withDigestType(DigestType.CRC32C);
break;
case DUMMY:
builder.withDigestType(DigestType.DUMMY);
break;
default:
throw new IllegalArgumentException("Unable to convert digest type " + ledgerMetadataFormat.getDigestType());
}

return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,12 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -44,16 +39,15 @@

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;

public class FileSystemManagedLedgerOffloader implements LedgerOffloader {

Expand Down Expand Up @@ -189,7 +183,7 @@ public void run() {
//store the ledgerMetadata in -1 index
key.set(METADATA_KEY_INDEX);
byte[] ledgerMetadata = buildLedgerMetadataFormat(readHandle.getLedgerMetadata());
value.set(buildLedgerMetadataFormat(readHandle.getLedgerMetadata()), 0, ledgerMetadata.length);
value.set(ledgerMetadata, 0, ledgerMetadata.length);
dataWriter.append(key, value);
AtomicLong haveOffloadEntryNumber = new AtomicLong(0);
long needToOffloadFirstEntryNumber = 0;
Expand Down Expand Up @@ -307,32 +301,6 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<Stri
return promise;
}

private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
DataFormats.LedgerMetadataFormat.Builder builder = DataFormats.LedgerMetadataFormat.newBuilder();
builder.setQuorumSize(metadata.getWriteQuorumSize())
.setAckQuorumSize(metadata.getAckQuorumSize())
.setEnsembleSize(metadata.getEnsembleSize())
.setLength(metadata.getLength())
.setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED : DataFormats.LedgerMetadataFormat.State.OPEN)
.setLastEntryId(metadata.getLastEntryId())
.setCtime(metadata.getCtime())
.setDigestType(BookKeeper.DigestType.toProtoDigestType(
BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType())));

for (Map.Entry<String, byte[]> e : metadata.getCustomMetadata().entrySet()) {
builder.addCustomMetadataBuilder()
.setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue()));
}

for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getAllEnsembles().entrySet()) {
builder.addSegmentBuilder()
.setFirstEntryId(e.getKey())
.addAllEnsembleMember(e.getValue().stream().map(a -> a.toString()).collect(Collectors.toList()));
}

return builder.build().toByteArray();
}

@Override
public OffloadPolicies getOffloadPolicies() {
return offloadPolicies;
Expand Down
Loading

0 comments on commit f0880f2

Please sign in to comment.