Skip to content

Commit

Permalink
PIP 76: Streaming Offload(Part I) (apache#9096)
Browse files Browse the repository at this point in the history
This PR contains the new interface and implementation of the offloader in the below PIP

Unit test is still in progress

- [x] StreamingDataBlockHeaderImpl
- [x] StreamingBlobStoreBackedReadHandleImpl
- [x] BufferedOffloadStream
- [x] BlobStoreManagedLedgerOffloader
- [x] StreamingOffloadIndexBlock

PIP 76: https://github.com/apache/pulsar/wiki/PIP-76:-Streaming-Offload
  • Loading branch information
Renkai authored Jan 26, 2021
1 parent f89dbb6 commit 17f399e
Show file tree
Hide file tree
Showing 36 changed files with 3,351 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,71 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;

/**
* Interface for offloading ledgers to long-term storage
* Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {

@ToString
class OffloadResult {
public final long beginLedger;
public final long beginEntry;
public final long endLedger;
public final long endEntry;

public OffloadResult(long beginLedger, long beginEntry, long endLedger, long endEntry) {
this.beginLedger = beginLedger;
this.beginEntry = beginEntry;
this.endLedger = endLedger;
this.endEntry = endEntry;
}
}

/**
* Used to store driver info, buffer entries, mark progress, etc.
* Create one per segment.
*/
interface OffloadHandle {
enum OfferEntryResult {
SUCCESS,
FAIL_BUFFER_FULL,
FAIL_SEGMENT_CLOSED,
FAIL_NOT_CONSECUTIVE
}

Position lastOffered();

CompletableFuture<Position> lastOfferedAsync();

/**
* The caller should manually release entry no matter what the offer result is.
*/
OfferEntryResult offerEntry(Entry entry);

CompletableFuture<OfferEntryResult> offerEntryAsync(Entry entry);

CompletableFuture<OffloadResult> getOffloadResultAsync();

/**
* Manually close current offloading segment
* @return true if the segment is not already closed
*/
boolean close();

default CompletableFuture<Boolean> AsyncClose() {
return CompletableFuture.completedFuture(close());
}
}

// TODO: improve the user metadata in subsequent changes
String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
Expand Down Expand Up @@ -63,7 +115,7 @@ default Map<String, String> getOffloadDriverMetadata() {
* alongside the ledger data.
*
* When the returned future completes, the ledger has been persisted to the
* loadterm storage, so it is safe to delete the original copy in bookkeeper.
* longterm storage, so it is safe to delete the original copy in bookkeeper.
*
* The uid is used to identify an attempt to offload. The implementation should
* use this to deterministically generate a unique name for the offloaded object.
Expand All @@ -85,6 +137,35 @@ CompletableFuture<Void> offload(ReadHandle ledger,
UUID uid,
Map<String, String> extraMetadata);

/**
* Begin offload the passed in ledgers to longterm storage, it will finish
* when a segment reached it's size or time.
* Should only be called once for a LedgerOffloader instance.
* Metadata passed in is for inspection purposes only and should be stored
* alongside the segment data.
*
* When the returned OffloaderHandle.getOffloadResultAsync completes, the corresponding
* ledgers has been persisted to the
* longterm storage, so it is safe to delete the original copy in bookkeeper.
*
* The uid is used to identify an attempt to offload. The implementation should
* use this to deterministically generate a unique name for the offloaded object.
* This uid will be stored in the managed ledger metadata before attempting the
* call to streamingOffload(). If a subsequent or concurrent call to streamingOffload() finds
* a uid in the metadata, it will attempt to cleanup this attempt with a call
* to #deleteOffloaded(ReadHandle,UUID). Once the offload attempt completes,
* the managed ledger will update its metadata again, to record the completion,
* ensuring that subsequent calls will not attempt to offload the same ledger
* again.
*
* @return an OffloaderHandle, which when `completeFuture()` completed, denotes that the offload has been successful.
*/
default CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uid, long beginLedger,
long beginEntry,
Map<String, String> driverMetadata) {
throw new UnsupportedOperationException();
}

/**
* Create a ReadHandle which can be used to read a ledger back from longterm
* storage.
Expand Down Expand Up @@ -115,6 +196,16 @@ CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata);

default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
Map<String, String> offloadDriverMetadata) {
throw new UnsupportedOperationException();
}

default CompletableFuture<Void> deleteOffloaded(UUID uid,
Map<String, String> offloadDriverMetadata) {
throw new UnsupportedOperationException();
}

/**
* Get offload policies of this LedgerOffloader
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.bookkeeper.mledger;

import io.netty.buffer.ByteBuf;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
Expand All @@ -33,6 +31,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;

/**
Expand Down Expand Up @@ -595,4 +594,10 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
* Get the ManagedLedgerInterceptor for ManagedLedger.
* */
ManagedLedgerInterceptor getManagedLedgerInterceptor();

/**
* Get basic ledger summary.
* will got null if corresponding ledger not exists.
*/
CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ public interface Position {
* @return the position of the next logical entry
*/
Position getNext();

long getLedgerId();

long getEntryId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -64,7 +65,6 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import io.netty.util.ReferenceCountUtil;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
Expand Down Expand Up @@ -1683,6 +1683,14 @@ public CompletableFuture<String> getLedgerMetadata(long ledgerId) {
return getLedgerHandle(ledgerId).thenApply(rh -> rh.getLedgerMetadata().toSafeString());
}

@Override
public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
CompletableFuture<LedgerInfo> result = new CompletableFuture<>();
final LedgerInfo ledgerInfo = ledgers.get(ledgerId);
result.complete(ledgerInfo);
return result;
}

CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
if (ledgerHandle != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;


import java.util.Map;
import java.util.UUID;
import lombok.ToString;
import org.apache.bookkeeper.mledger.LedgerOffloader;

@ToString
public class OffloadSegmentInfoImpl {
public OffloadSegmentInfoImpl(UUID uuid, long beginLedgerId, long beginEntryId, String driverName,
Map<String, String> driverMetadata) {
this.uuid = uuid;
this.beginLedgerId = beginLedgerId;
this.beginEntryId = beginEntryId;
this.driverName = driverName;
this.driverMetadata = driverMetadata;
}


public final UUID uuid;
public final long beginLedgerId;
public final long beginEntryId;
public final String driverName;
volatile private long endLedgerId;
volatile private long endEntryId;
volatile boolean closed = false;
public final Map<String, String> driverMetadata;

public boolean isClosed() {
return closed;
}

public void closeSegment(long endLedger, long endEntry) {
this.endLedgerId = endLedger;
this.endEntryId = endEntry;
this.closed = true;
}

public LedgerOffloader.OffloadResult result() {
return new LedgerOffloader.OffloadResult(beginLedgerId, beginEntryId, endLedgerId, endEntryId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.impl;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.Objects;
import com.google.common.collect.ComparisonChain;
import org.apache.bookkeeper.mledger.Position;
Expand Down
11 changes: 11 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ message OffloadContext {
optional bool bookkeeperDeleted = 4;
optional int64 timestamp = 5;
optional OffloadDriverMetadata driverMetadata = 6;
repeated OffloadSegment offloadSegment = 7;
}

message OffloadSegment {
optional int64 uidMsb = 1;
optional int64 uidLsb = 2;
optional bool complete = 3;
optional int64 assignedTimestamp = 4; //timestamp in millisecond
optional int64 offloadedTimestamp = 5; //timestamp in millisecond
optional int64 endEntryId = 6;
optional OffloadDriverMetadata driverMetadata = 7;
}

message ManagedLedgerInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,21 @@ public void testOffloadRead() throws Exception {
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(1))
.readOffloaded(anyLong(), any(), anyMap());
.readOffloaded(anyLong(), (UUID) any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap());

for (Entry e : cursor.readEntries(10)) {
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
.readOffloaded(anyLong(), any(), anyMap());
.readOffloaded(anyLong(), (UUID) any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());

for (Entry e : cursor.readEntries(5)) {
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
.readOffloaded(anyLong(), any(), anyMap());
.readOffloaded(anyLong(), (UUID) any(), anyMap());
}

@Test
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testBookkeeperFirstOffloadRead() throws Exception {
}
// For offloaded first and not deleted ledgers, they should be read from bookkeeper.
verify(offloader, never())
.readOffloaded(anyLong(), any(), anyMap());
.readOffloaded(anyLong(), (UUID) any(), anyMap());

// Delete offladed message from bookkeeper
assertEventuallyTrue(() -> bkc.getLedgers().contains(firstLedger.getLedgerId()));
Expand All @@ -186,7 +186,7 @@ public void testBookkeeperFirstOffloadRead() throws Exception {

// Ledgers deleted from bookkeeper, now should read from offloader
verify(offloader, atLeastOnce())
.readOffloaded(anyLong(), any(), anyMap());
.readOffloaded(anyLong(), (UUID) any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;

/**
* The data block header in code storage for each data block.
* The data block header in tiered storage for each data block.
*
* <p>Currently, It is in format:
* [ magic_word -- int ][ block_len -- int ][ first_entry_id -- long][padding]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* The Index block abstraction used for offload a ledger to long term storage.
*/
@Unstable
public interface OffloadIndexBlock extends Closeable {
public interface OffloadIndexBlock extends Closeable, OffloadIndexBlockV2 {

/**
* Get the content of the index block as InputStream.
Expand Down Expand Up @@ -86,5 +86,18 @@ public long getStreamSize() {
return streamSize;
}
}

default OffloadIndexEntry getIndexEntryForEntry(long ledgerId, long messageEntryId) throws IOException {
return getIndexEntryForEntry(messageEntryId);
}

default long getStartEntryId(long ledgerId) {
return 0; //Offload index block v1 always start with 0;
}

default LedgerMetadata getLedgerMetadata(long ledgerId) {
return getLedgerMetadata();
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockBuilderImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockV2BuilderImpl;

/**
* Interface for builder of index block used for offload a ledger to long term storage.
Expand Down Expand Up @@ -71,12 +71,12 @@ public interface OffloadIndexBlockBuilder {
/**
* Construct OffloadIndex from an InputStream.
*/
OffloadIndexBlock fromStream(InputStream is) throws IOException;
OffloadIndexBlockV2 fromStream(InputStream is) throws IOException;

/**
* create an OffloadIndexBlockBuilder.
*/
static OffloadIndexBlockBuilder create() {
return new OffloadIndexBlockBuilderImpl();
return new OffloadIndexBlockV2BuilderImpl();
}
}
Loading

0 comments on commit 17f399e

Please sign in to comment.