Skip to content

Commit

Permalink
[Tiered Storage] Fix merge conflicts introduced by PR apache#6335 (ap…
Browse files Browse the repository at this point in the history
…ache#8630)

# Motivation

The PR apache#6335 lost some PR changes, related PRs as below.

1. PR 4196 (2019/5/29 Merli)
Configure static PulsarByteBufAllocator to handle OOM errors (apache#4196)

2. PR 5356 (2019/10/30 Kelly)
[TIEREDSTORAGE] Only seek when reading unexpected entry (apache#5356)

3. PR 4433 (2019/6/4 Higham)
[tiered-storage] Add support for AWS instance and role creds (apache#4433)
  • Loading branch information
gaoran10 authored Nov 21, 2020
1 parent 19767c7 commit 68759ff
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.options.GetOptions;
Expand Down Expand Up @@ -52,7 +52,7 @@ public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String
this.bucket = bucket;
this.key = key;
this.versionCheck = versionCheck;
this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
this.objectLen = objectLen;
this.bufferSize = bufferSize;
this.cursor = 0;
Expand Down Expand Up @@ -116,7 +116,8 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public void seek(long position) {
log.debug("Seeking to {} on {}/{}, current position {}", position, bucket, key, cursor);
log.debug("Seeking to {} on {}/{}, current position {} (bufStart:{}, bufEnd:{})",
position, bucket, key, cursor, bufferOffsetStart, bufferOffsetEnd);
if (position >= bufferOffsetStart && position <= bufferOffsetEnd) {
long newIndex = position - bufferOffsetStart;
buffer.readerIndex((int) newIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -38,8 +37,8 @@
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
Expand Down Expand Up @@ -104,26 +103,31 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
long nextExpectedId = firstEntry;
try {
OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry);
inputStream.seek(entry.getDataOffset());

while (entriesToRead > 0) {
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
length = dataStream.readInt();
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
}
long entryId = dataStream.readLong();

if (entryId == nextExpectedId) {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
int toWrite = length;
while (toWrite > 0) {
toWrite -= buf.writeBytes(dataStream, toWrite);
}
entriesToRead--;
nextExpectedId++;
} else if (entryId > nextExpectedId) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId < nextExpectedId
&& !index.getIndexEntryForEntry(nextExpectedId).equals(
index.getIndexEntryForEntry(entryId))) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {

private final OrderedScheduler scheduler;
private final TieredStorageConfiguration config;
// private final BlobStore writeBlobStore;
private final Location writeLocation;

// metadata to be stored as part of the offloaded ledger metadata
Expand Down Expand Up @@ -116,11 +115,6 @@ public Map<String, String> getOffloadDriverMetadata() {
return config.getOffloadDriverMetadata();
}

// @VisibleForTesting
// public ConcurrentMap<BlobStoreLocation, BlobStore> getBlobStores() {
// return blobStores;
// }

/**
* Upload the DataBlocks associated with the given ReadHandle using MultiPartUpload,
* Creating indexBlocks for each corresponding DataBlock that is uploaded.
Expand Down Expand Up @@ -303,7 +297,6 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,

@Override
public OffloadPolicies getOffloadPolicies() {
// TODO Auto-generated method stub
Properties properties = new Properties();
properties.putAll(config.getConfigProperties());
return OffloadPolicies.create(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
Expand All @@ -33,6 +32,7 @@
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -126,8 +126,8 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
int entryLength = buf.readableBytes();
long entryId = entry.getEntryId();

CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(2);
ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);

entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
entryBuf.addComponents(true, entryHeaderBuf, buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;

/**
* The data block header in code storage for each data block.
Expand Down Expand Up @@ -110,7 +111,7 @@ public DataBlockHeaderImpl(long headerLength, long blockLength, long firstEntryI
*/
@Override
public InputStream toStream() {
ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
out.writeInt(MAGIC_WORD)
.writeLong(headerLength)
.writeLong(blockLength)
Expand All @@ -120,5 +121,11 @@ public InputStream toStream() {
// true means the input stream will release the ByteBuf on close
return new ByteBufInputStream(out, true);
}

@Override
public String toString() {
return String.format("DataBlockHeader(len:%d,hlen:%d,firstEntry:%d)",
blockLength, headerLength, firstEntryId);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

Expand All @@ -42,6 +41,7 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -156,7 +156,7 @@ public OffloadIndexBlock.IndexInputStream toStream() throws IOException {
+ segmentMetadataLength
+ indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + blockOffset */

ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);

out.writeInt(INDEX_MAGIC_WORD)
.writeInt(indexBlockLength)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,11 @@ private OffloadIndexEntryImpl(long entryId, int partId, long offset, long blockH
this.offset = offset;
this.blockHeaderSize = blockHeaderSize;
}

@Override
public String toString() {
return String.format("[eid:%d, part:%d, offset:%d, doffset:%d]",
entryId, partId, offset, getDataOffset());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud.provider;

import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.GCS_ACCOUNT_KEY_FILE_FIELD;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_FIELD;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.google.common.base.Strings;
import com.google.common.io.Files;

Expand All @@ -36,8 +42,8 @@
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.CredentialBuilder;

import org.apache.commons.lang3.StringUtils;
import org.jclouds.Constants;
import org.jclouds.ContextBuilder;
import org.jclouds.aws.domain.SessionCredentials;
import org.jclouds.aws.s3.AWSS3ProviderMetadata;
import org.jclouds.azureblob.AzureBlobProviderMetadata;
import org.jclouds.blobstore.BlobStore;
Expand Down Expand Up @@ -106,9 +112,8 @@ public void buildCredentials(TieredStorageConfiguration config) {
if (config.getCredentials() == null) {
try {
String gcsKeyContent = Files.toString(
new File(config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile")),
Charset.defaultCharset());
config.setProviderCredentials(new GoogleCredentialsFromJson(gcsKeyContent).get());
new File(config.getConfigProperty(GCS_ACCOUNT_KEY_FILE_FIELD)), Charset.defaultCharset());
config.setProviderCredentials(() -> new GoogleCredentialsFromJson(gcsKeyContent).get());
} catch (IOException ioe) {
log.error("Cannot read GCS service account credentials file: {}",
config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile"));
Expand Down Expand Up @@ -139,7 +144,7 @@ public void buildCredentials(TieredStorageConfiguration config) {
if (StringUtils.isEmpty(accountKey)) {
throw new IllegalArgumentException("Couldn't get the azure storage access key.");
}
config.setProviderCredentials(new Credentials(accountName, accountKey));
config.setProviderCredentials(() -> new Credentials(accountName, accountKey));
}
},

Expand Down Expand Up @@ -246,8 +251,7 @@ public ProviderMetadata getProviderMetadata() {

if (config.getProviderCredentials() != null) {
return contextBuilder
.credentials(config.getProviderCredentials().identity,
config.getProviderCredentials().credential)
.credentialsSupplier(config.getCredentials())
.buildView(BlobStoreContext.class)
.getBlobStore();
} else {
Expand All @@ -262,17 +266,35 @@ public ProviderMetadata getProviderMetadata() {
if (config.getCredentials() == null) {
AWSCredentials awsCredentials = null;
try {
DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance();
awsCredentials = creds.getCredentials();
if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
awsCredentials = DefaultAWSCredentialsProviderChain.getInstance().getCredentials();
} else {
awsCredentials =
new STSAssumeRoleSessionCredentialsProvider.Builder(
config.getConfigProperty(S3_ROLE_FIELD),
config.getConfigProperty(S3_ROLE_SESSION_NAME_FIELD)
).build().getCredentials();
}

if (awsCredentials instanceof AWSSessionCredentials) {
// if we have session credentials, we need to send the session token
// this allows us to support EC2 metadata credentials
SessionCredentials sessionCredentials = SessionCredentials.builder()
.accessKeyId(awsCredentials.getAWSAccessKeyId())
.secretAccessKey(awsCredentials.getAWSSecretKey())
.sessionToken(((AWSSessionCredentials) awsCredentials).getSessionToken())
.build();
config.setProviderCredentials(() -> sessionCredentials);
} else {
Credentials credentials = new Credentials(
awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey());
config.setProviderCredentials(() -> credentials);
}

} catch (Exception e) {
// allowed, some mock s3 service do not need credential
log.warn("Exception when get credentials for s3 ", e);
}
if (awsCredentials != null) {
config.setProviderCredentials(
new Credentials(awsCredentials.getAWSAccessKeyId(),
awsCredentials.getAWSSecretKey()));
}
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
import org.jclouds.Constants;
import org.jclouds.aws.s3.AWSS3ProviderMetadata;
import org.jclouds.blobstore.BlobStore;
Expand Down Expand Up @@ -69,6 +70,10 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {

protected static final int MB = 1024 * 1024;

public static final String GCS_ACCOUNT_KEY_FILE_FIELD = "gcsManagedLedgerOffloadServiceAccountKeyFile";
public static final String S3_ROLE_FIELD = "s3ManagedLedgerOffloadRole";
public static final String S3_ROLE_SESSION_NAME_FIELD = "s3ManagedLedgerOffloadRoleSessionName";

public static TieredStorageConfiguration create(Properties props) throws IOException {
Map<String, String> map = new HashMap<String, String>();
map.putAll(props.entrySet()
Expand All @@ -86,7 +91,7 @@ public static TieredStorageConfiguration create(Map<String, String> props) {
@Getter
private final Map<String, String> configProperties;
@Getter
private Credentials credentials;
private Supplier<Credentials> credentials;
private JCloudBlobStoreProvider provider;

public TieredStorageConfiguration(Map<String, String> configProperties) {
Expand Down Expand Up @@ -221,14 +226,14 @@ public Integer getReadBufferSizeInBytes() {
return new Integer(MB);
}

public Credentials getProviderCredentials() {
public Supplier<Credentials> getProviderCredentials() {
if (credentials == null) {
getProvider().buildCredentials(this);
}
return credentials;
}

public void setProviderCredentials(Credentials credentials) {
public void setProviderCredentials(Supplier<Credentials> credentials) {
this.credentials = credentials;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.data.ACL;
Expand Down Expand Up @@ -99,14 +100,14 @@ protected static JCloudBlobStoreProvider getBlobStoreProvider() {
* Get the credentials to use for the JCloud provider
* based on the System properties.
*/
protected static Credentials getBlobStoreCredentials() {
protected static Supplier<Credentials> getBlobStoreCredentials() {
if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) {
/* To use this, must config credentials using "aws_access_key_id" as S3ID,
* and "aws_secret_access_key" as S3Key. And bucket should exist in default region. e.g.
* props.setProperty("S3ID", "AXXXXXXQ");
* props.setProperty("S3Key", "HXXXXXß");
*/
return new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));
return () -> new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));

} else if (Boolean.parseBoolean(System.getProperty("testRealGCS", "false"))) {
/*
Expand All @@ -115,7 +116,7 @@ protected static Credentials getBlobStoreCredentials() {
* props.setProperty("GCSID", "[email protected]");
* props.setProperty("GCSKey", "XXXXXX");
*/
return new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey"));
return () -> new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey"));
} else {
return null;
}
Expand Down

0 comments on commit 68759ff

Please sign in to comment.