Skip to content

Commit

Permalink
Refactored JCloud Tiered Storage (apache#6335)
Browse files Browse the repository at this point in the history
Motivation

In order to facilitate the support of additional JClouds-supported providers, we first needed to clean up the existing code, as there were a lot of if/then/else constructs throughout the code that were based on the assumption that we either supported AWS S3 or Google Cloud Storage. I didn't want to keep adding else if's to these code blocks for every new provider we add, so I decided to refactor the code to make it a bit cleaner

Modifications

in addition to being home for most of the aforementioned if/then/else blocks, the BlobStoreManagedLedgerOffloader class had multiple responsibilities in addition to providing an implementation for the LedgerOffloader interface. My goal was to simplify this class such that its only responsibility was to implement the LedgerOffloader interface.

The other major change was the addition of the JCloudBlobStoreProvider enum, which implements 3 interfaces that allow for it to handle the provider specific logic for things such as acquiring the credentials, validating the configuration, and creating a provider-specific instance of BlobStore.

Result

After this change, we will be able to easily add support for additional JClouds-supported providers by simply adding new elements to the JCloudBlobStoreProvider Enums since the other logic has been isolated and is not vendor specific.

See apache#2865 for more details

* Refactored JCloud Tiered Storage

* Refactored JCloud Tiered Storage

* Added missing import statements

* Refactored JCloud Tiered Storage

* Refactored JCloud Tiered Storage

* Added missing import statements

* Refactored JCloud Tiered Storage

* Refactored JCloud Tiered Storage

* Added missing import statements

* fix test

* add test logs

* fix logs

* fix test

* fix

* fix

* fix broker log

* fix configuration

* fix

* fix

* fix

* fix get BlobStore

* repair test presto query tiered storage data

* fix test

* fix test

* fix test TestPrestoQueryTieredStorage

* fix

* fix

Co-authored-by: Sijie Guo <[email protected]>
Co-authored-by: gaoran10 <[email protected]>
Co-authored-by: xiaolong.ran <[email protected]>
  • Loading branch information
4 people authored Oct 19, 2020
1 parent 9c821cf commit d3a5233
Show file tree
Hide file tree
Showing 37 changed files with 1,930 additions and 870 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,62 @@ public String toString() {
.toString();
}

public static final String METADATA_FIELD_BUCKET = "bucket";
public static final String METADATA_FIELD_REGION = "region";
public static final String METADATA_FIELD_ENDPOINT = "serviceEndpoint";
public static final String METADATA_FIELD_MAX_BLOCK_SIZE = "maxBlockSizeInBytes";
public static final String METADATA_FIELD_READ_BUFFER_SIZE = "readBufferSizeInBytes";
public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload.";

public static Properties toProperties(OffloadPolicies offloadPolicies) {
Properties properties = new Properties();

setProperty(properties, "managedLedgerOffloadDriver", offloadPolicies.getManagedLedgerOffloadDriver());
setProperty(properties, "managedLedgerOffloadMaxThreads",
offloadPolicies.getManagedLedgerOffloadMaxThreads());
setProperty(properties, "managedLedgerOffloadPrefetchRounds",
offloadPolicies.getManagedLedgerOffloadPrefetchRounds());
setProperty(properties, "managedLedgerOffloadThresholdInBytes",
offloadPolicies.getManagedLedgerOffloadThresholdInBytes());
setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis());

if (offloadPolicies.isS3Driver()) {
setProperty(properties, "s3ManagedLedgerOffloadRegion",
offloadPolicies.getS3ManagedLedgerOffloadRegion());
setProperty(properties, "s3ManagedLedgerOffloadBucket",
offloadPolicies.getS3ManagedLedgerOffloadBucket());
setProperty(properties, "s3ManagedLedgerOffloadServiceEndpoint",
offloadPolicies.getS3ManagedLedgerOffloadServiceEndpoint());
setProperty(properties, "s3ManagedLedgerOffloadMaxBlockSizeInBytes",
offloadPolicies.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "s3ManagedLedgerOffloadRole",
offloadPolicies.getS3ManagedLedgerOffloadRole());
setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName",
offloadPolicies.getS3ManagedLedgerOffloadRoleSessionName());
} else if (offloadPolicies.isGcsDriver()) {
setProperty(properties, "gcsManagedLedgerOffloadRegion",
offloadPolicies.getGcsManagedLedgerOffloadRegion());
setProperty(properties, "gcsManagedLedgerOffloadBucket",
offloadPolicies.getGcsManagedLedgerOffloadBucket());
setProperty(properties, "gcsManagedLedgerOffloadMaxBlockSizeInBytes",
offloadPolicies.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "gcsManagedLedgerOffloadReadBufferSizeInBytes",
offloadPolicies.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
setProperty(properties, "gcsManagedLedgerOffloadServiceAccountKeyFile",
offloadPolicies.getGcsManagedLedgerOffloadServiceAccountKeyFile());

} else if (offloadPolicies.isFileSystemDriver()) {
setProperty(properties, "fileSystemProfilePath", offloadPolicies.getFileSystemProfilePath());
setProperty(properties, "fileSystemURI", offloadPolicies.getFileSystemURI());
}
return properties;
}

private static void setProperty(Properties properties, String key, Object value) {
if (value != null) {
properties.setProperty(key, "" + value);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ protected void configure() {

protected void beforeStart() {}

protected void afterStart() {}

@Override
public void start() {
if (httpPort > 0 && servicePort < 0) {
Expand All @@ -146,6 +148,7 @@ public void start() {

beforeStart();
super.start();
afterStart();
log.info("Start pulsar service {} at container {}", serviceName, containerName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ public class TestS3Offload extends TestBaseOffload {

private S3Container s3Container;

@BeforeClass
public void setupS3() {
@Override
protected void beforeStartCluster() throws Exception {
super.beforeStartCluster();

log.info("s3 container init");
s3Container = new S3Container(
pulsarCluster.getClusterName(),
S3Container.NAME)
.withNetwork(pulsarCluster.getNetwork())
.withNetworkAliases(S3Container.NAME);
s3Container.start();
log.info("s3 container start finish.");
}

@AfterClass
Expand Down Expand Up @@ -71,7 +75,7 @@ protected Map<String, String> getEnv() {
Map<String, String> result = new HashMap<>();
result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
result.put("managedLedgerOffloadDriver", "s3");
result.put("managedLedgerOffloadDriver", "aws-s3");
result.put("s3ManagedLedgerOffloadBucket", "pulsar-integtest");
result.put("s3ManagedLedgerOffloadServiceEndpoint", "http://" + S3Container.NAME + ":9090");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.sql.Connection;
import java.sql.DriverManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testng.ITest;
Expand Down Expand Up @@ -50,16 +49,7 @@ public void setupCluster() throws Exception {
.clusterName(clusterName)
.build();

log.info("Setting up cluster {} with {} bookies, {} brokers",
spec.clusterName(), spec.numBookies(), spec.numBrokers());
pulsarCluster = PulsarCluster.forSpec(spec);
for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
getEnv().forEach(brokerContainer::withEnv);
}

pulsarCluster.start();

log.info("Cluster {} is setup", spec.clusterName());
setupCluster(spec);
}

@AfterSuite
Expand All @@ -74,4 +64,12 @@ public String getTestName() {
}

protected abstract Map<String, String> getEnv();

@Override
protected void beforeStartCluster() throws Exception {
super.beforeStartCluster();
for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
getEnv().forEach(brokerContainer::withEnv);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud;

import java.io.InputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* Abstract input stream class.
*/
public abstract class BackedInputStream extends InputStream {
public abstract void seek(long position);
public abstract void seekForward(long position) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public abstract class BlockAwareSegmentInputStream extends InputStream {
public abstract int getBlockSize();

/**
* Get entry count that read out from this InputStream
* Get entry count that read out from this InputStream.
*
* @return the block entry count
*/
Expand All @@ -62,7 +62,7 @@ public abstract class BlockAwareSegmentInputStream extends InputStream {
public abstract long getEndEntryId();

/**
* Get sum of entries data size read from the this InputStream
* Get sum of entries data size read from the this InputStream.
*
* @return the block entry bytes count
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;

/**
* The data block header in code storage for each data block
* The data block header in code storage for each data block.
*
* Currently, It is in format:
* <p>Currently, It is in format:
* [ magic_word -- int ][ block_len -- int ][ first_entry_id -- long][padding]
*
* with the size: 4 + 4 + 8 + padding = 128 Bytes
* with the size: 4 + 4 + 8 + padding = 128 Bytes</p>
*/
@Unstable
public interface DataBlockHeader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.pulsar.common.policies.data.OffloadPolicies;

/**
Expand All @@ -38,13 +40,15 @@ public static JCloudLedgerOffloaderFactory of() {

@Override
public boolean isDriverSupported(String driverName) {
return BlobStoreManagedLedgerOffloader.driverSupported(driverName);
return JCloudBlobStoreProvider.driverSupported(driverName);
}

@Override
public BlobStoreManagedLedgerOffloader create(OffloadPolicies offloadPolicies,
Map<String, String> userMetadata,
OrderedScheduler scheduler) throws IOException {
return BlobStoreManagedLedgerOffloader.create(offloadPolicies, userMetadata, scheduler);
public BlobStoreManagedLedgerOffloader create(OffloadPolicies offloadPolicies, Map<String, String> userMetadata,
OrderedScheduler scheduler) throws IOException {

TieredStorageConfiguration config =
TieredStorageConfiguration.create(OffloadPolicies.toProperties(offloadPolicies));
return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@

import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;

/**
*
* The Index block abstraction used for offload a ledger to long term storage.
*
*/
@Unstable
public interface OffloadIndexBlock extends Closeable {
Expand Down Expand Up @@ -73,7 +71,7 @@ public interface OffloadIndexBlock extends Closeable {
/**
* An input stream which knows the size of the stream upfront.
*/
public static class IndexInputStream extends FilterInputStream {
class IndexInputStream extends FilterInputStream {
final long streamSize;

public IndexInputStream(InputStream in, long streamSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,19 @@ public interface OffloadIndexBlockBuilder {
OffloadIndexBlockBuilder withDataBlockHeaderLength(long dataHeaderLength);

/**
* Finalize the immutable OffloadIndexBlock
* Finalize the immutable OffloadIndexBlock.
*/
OffloadIndexBlock build();

/**
* Construct OffloadIndex from an InputStream
* Construct OffloadIndex from an InputStream.
*/
OffloadIndexBlock fromStream(InputStream is) throws IOException;

/**
* create an OffloadIndexBlockBuilder
* create an OffloadIndexBlockBuilder.
*/
static OffloadIndexBlockBuilder create() {
return new OffloadIndexBlockBuilderImpl();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;

/**
*
* The Index Entry in OffloadIndexBlock.
* It consists of the message entry id, the code storage block part id for this message entry,
* and the offset in code storage block for this message id.
*
*/
@Unstable
@LimitedPrivate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.options.GetOptions;
Expand Down Expand Up @@ -52,7 +52,7 @@ public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String
this.bucket = bucket;
this.key = key;
this.versionCheck = versionCheck;
this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
this.objectLen = objectLen;
this.bufferSize = bufferSize;
this.cursor = 0;
Expand Down Expand Up @@ -116,10 +116,10 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public void seek(long position) {
log.debug("Seeking to {} on {}/{}, current position {} (bufStart:{}, bufEnd:{})", position, bucket, key, cursor, bufferOffsetStart, bufferOffsetEnd);
log.debug("Seeking to {} on {}/{}, current position {}", position, bucket, key, cursor);
if (position >= bufferOffsetStart && position <= bufferOffsetEnd) {
long newIndex = position - bufferOffsetStart;
buffer.readerIndex((int)newIndex);
buffer.readerIndex((int) newIndex);
} else {
this.cursor = position;
buffer.clear();
Expand Down
Loading

0 comments on commit d3a5233

Please sign in to comment.