diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java index 386196ce2e7aa..6a204d56de951 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java @@ -19,11 +19,11 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import java.io.IOException; import java.io.InputStream; import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.options.GetOptions; @@ -52,7 +52,7 @@ public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String this.bucket = bucket; this.key = key; this.versionCheck = versionCheck; - this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize); + this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize); this.objectLen = objectLen; this.bufferSize = bufferSize; this.cursor = 0; @@ -116,7 +116,8 @@ public int read(byte[] b, int off, int len) throws IOException { @Override public void seek(long position) { - log.debug("Seeking to {} on {}/{}, current position {}", position, bucket, key, cursor); + log.debug("Seeking to {} on {}/{}, current position {} (bufStart:{}, bufEnd:{})", + position, bucket, key, cursor, bufferOffsetStart, bufferOffsetEnd); if (position >= bufferOffsetStart && position <= bufferOffsetEnd) { long newIndex = position - bufferOffsetStart; buffer.readerIndex((int) newIndex); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 92f25143843b8..b48751e830945 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; @@ -38,8 +37,8 @@ import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; -import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry; import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; @@ -104,19 +103,16 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr List entries = new ArrayList(); long nextExpectedId = firstEntry; try { - OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry); - inputStream.seek(entry.getDataOffset()); - while (entriesToRead > 0) { int length = dataStream.readInt(); if (length < 0) { // hit padding or new block - inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset()); - length = dataStream.readInt(); + inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset()); + continue; } long entryId = dataStream.readLong(); if (entryId == nextExpectedId) { - ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length); + ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length); entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf)); int toWrite = length; while (toWrite > 0) { @@ -124,6 +120,14 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } entriesToRead--; nextExpectedId++; + } else if (entryId > nextExpectedId) { + inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset()); + continue; + } else if (entryId < nextExpectedId + && !index.getIndexEntryForEntry(nextExpectedId).equals( + index.getIndexEntryForEntry(entryId))) { + inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset()); + continue; } else if (entryId > lastEntry) { log.info("Expected to read {}, but read {}, which is greater than last entry {}", nextExpectedId, entryId, lastEntry); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index 6643a838c31fa..4f517134e946e 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -67,7 +67,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { private final OrderedScheduler scheduler; private final TieredStorageConfiguration config; -// private final BlobStore writeBlobStore; private final Location writeLocation; // metadata to be stored as part of the offloaded ledger metadata @@ -116,11 +115,6 @@ public Map getOffloadDriverMetadata() { return config.getOffloadDriverMetadata(); } -// @VisibleForTesting -// public ConcurrentMap getBlobStores() { -// return blobStores; -// } - /** * Upload the DataBlocks associated with the given ReadHandle using MultiPartUpload, * Creating indexBlocks for each corresponding DataBlock that is uploaded. @@ -303,7 +297,6 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, @Override public OffloadPolicies getOffloadPolicies() { - // TODO Auto-generated method stub Properties properties = new Properties(); properties.putAll(config.getConfigProperties()); return OffloadPolicies.create(properties); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index dc7a68a94412b..dcc693a80c9fc 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -23,7 +23,6 @@ import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; @@ -33,6 +32,7 @@ import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,8 +126,8 @@ private List readNextEntriesFromLedger(long start, long maxNumberEntrie int entryLength = buf.readableBytes(); long entryId = entry.getEntryId(); - CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(2); - ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE); + CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2); + ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE); entryHeaderBuf.writeInt(entryLength).writeLong(entryId); entryBuf.addComponents(true, entryHeaderBuf, buf); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java index 9e3fe905c0400..9239ec2fff272 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; /** * The data block header in code storage for each data block. @@ -110,7 +111,7 @@ public DataBlockHeaderImpl(long headerLength, long blockLength, long firstEntryI */ @Override public InputStream toStream() { - ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE); + ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE); out.writeInt(MAGIC_WORD) .writeLong(headerLength) .writeLong(blockLength) @@ -120,5 +121,11 @@ public InputStream toStream() { // true means the input stream will release the ByteBuf on close return new ByteBufInputStream(out, true); } + + @Override + public String toString() { + return String.format("DataBlockHeader(len:%d,hlen:%d,firstEntry:%d)", + blockLength, headerLength, firstEntryId); + } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java index 71948898e2b60..80fd62f35e1da 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java @@ -22,7 +22,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; @@ -42,6 +41,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.DataFormats; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,7 +156,7 @@ public OffloadIndexBlock.IndexInputStream toStream() throws IOException { + segmentMetadataLength + indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + blockOffset */ - ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength); + ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength); out.writeInt(INDEX_MAGIC_WORD) .writeInt(indexBlockLength) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java index 5dce79d0e73f8..10408fef44e36 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java @@ -56,5 +56,11 @@ private OffloadIndexEntryImpl(long entryId, int partId, long offset, long blockH this.offset = offset; this.blockHeaderSize = blockHeaderSize; } + + @Override + public String toString() { + return String.format("[eid:%d, part:%d, offset:%d, doffset:%d]", + entryId, partId, offset, getDataOffset()); + } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java index ba03f64d81d4b..df4b0d311c351 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java @@ -18,8 +18,14 @@ */ package org.apache.bookkeeper.mledger.offload.jcloud.provider; +import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.GCS_ACCOUNT_KEY_FILE_FIELD; +import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_FIELD; +import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD; + import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSSessionCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.google.common.base.Strings; import com.google.common.io.Files; @@ -36,8 +42,8 @@ import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.CredentialBuilder; import org.apache.commons.lang3.StringUtils; -import org.jclouds.Constants; import org.jclouds.ContextBuilder; +import org.jclouds.aws.domain.SessionCredentials; import org.jclouds.aws.s3.AWSS3ProviderMetadata; import org.jclouds.azureblob.AzureBlobProviderMetadata; import org.jclouds.blobstore.BlobStore; @@ -106,9 +112,8 @@ public void buildCredentials(TieredStorageConfiguration config) { if (config.getCredentials() == null) { try { String gcsKeyContent = Files.toString( - new File(config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile")), - Charset.defaultCharset()); - config.setProviderCredentials(new GoogleCredentialsFromJson(gcsKeyContent).get()); + new File(config.getConfigProperty(GCS_ACCOUNT_KEY_FILE_FIELD)), Charset.defaultCharset()); + config.setProviderCredentials(() -> new GoogleCredentialsFromJson(gcsKeyContent).get()); } catch (IOException ioe) { log.error("Cannot read GCS service account credentials file: {}", config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile")); @@ -139,7 +144,7 @@ public void buildCredentials(TieredStorageConfiguration config) { if (StringUtils.isEmpty(accountKey)) { throw new IllegalArgumentException("Couldn't get the azure storage access key."); } - config.setProviderCredentials(new Credentials(accountName, accountKey)); + config.setProviderCredentials(() -> new Credentials(accountName, accountKey)); } }, @@ -246,8 +251,7 @@ public ProviderMetadata getProviderMetadata() { if (config.getProviderCredentials() != null) { return contextBuilder - .credentials(config.getProviderCredentials().identity, - config.getProviderCredentials().credential) + .credentialsSupplier(config.getCredentials()) .buildView(BlobStoreContext.class) .getBlobStore(); } else { @@ -262,17 +266,35 @@ public ProviderMetadata getProviderMetadata() { if (config.getCredentials() == null) { AWSCredentials awsCredentials = null; try { - DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance(); - awsCredentials = creds.getCredentials(); + if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) { + awsCredentials = DefaultAWSCredentialsProviderChain.getInstance().getCredentials(); + } else { + awsCredentials = + new STSAssumeRoleSessionCredentialsProvider.Builder( + config.getConfigProperty(S3_ROLE_FIELD), + config.getConfigProperty(S3_ROLE_SESSION_NAME_FIELD) + ).build().getCredentials(); + } + + if (awsCredentials instanceof AWSSessionCredentials) { + // if we have session credentials, we need to send the session token + // this allows us to support EC2 metadata credentials + SessionCredentials sessionCredentials = SessionCredentials.builder() + .accessKeyId(awsCredentials.getAWSAccessKeyId()) + .secretAccessKey(awsCredentials.getAWSSecretKey()) + .sessionToken(((AWSSessionCredentials) awsCredentials).getSessionToken()) + .build(); + config.setProviderCredentials(() -> sessionCredentials); + } else { + Credentials credentials = new Credentials( + awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey()); + config.setProviderCredentials(() -> credentials); + } + } catch (Exception e) { // allowed, some mock s3 service do not need credential log.warn("Exception when get credentials for s3 ", e); } - if (awsCredentials != null) { - config.setProviderCredentials( - new Credentials(awsCredentials.getAWSAccessKeyId(), - awsCredentials.getAWSSecretKey())); - } } }; } \ No newline at end of file diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java index 5abd8670ef2e0..ac2bed9e01ccf 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java @@ -35,6 +35,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier; import org.jclouds.Constants; import org.jclouds.aws.s3.AWSS3ProviderMetadata; import org.jclouds.blobstore.BlobStore; @@ -69,6 +70,10 @@ public class TieredStorageConfiguration implements Serializable, Cloneable { protected static final int MB = 1024 * 1024; + public static final String GCS_ACCOUNT_KEY_FILE_FIELD = "gcsManagedLedgerOffloadServiceAccountKeyFile"; + public static final String S3_ROLE_FIELD = "s3ManagedLedgerOffloadRole"; + public static final String S3_ROLE_SESSION_NAME_FIELD = "s3ManagedLedgerOffloadRoleSessionName"; + public static TieredStorageConfiguration create(Properties props) throws IOException { Map map = new HashMap(); map.putAll(props.entrySet() @@ -86,7 +91,7 @@ public static TieredStorageConfiguration create(Map props) { @Getter private final Map configProperties; @Getter - private Credentials credentials; + private Supplier credentials; private JCloudBlobStoreProvider provider; public TieredStorageConfiguration(Map configProperties) { @@ -221,14 +226,14 @@ public Integer getReadBufferSizeInBytes() { return new Integer(MB); } - public Credentials getProviderCredentials() { + public Supplier getProviderCredentials() { if (credentials == null) { getProvider().buildCredentials(this); } return credentials; } - public void setProviderCredentials(Credentials credentials) { + public void setProviderCredentials(Supplier credentials) { this.credentials = credentials; } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java index ea613e701bfe6..a2944f575de33 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.data.ACL; @@ -99,14 +100,14 @@ protected static JCloudBlobStoreProvider getBlobStoreProvider() { * Get the credentials to use for the JCloud provider * based on the System properties. */ - protected static Credentials getBlobStoreCredentials() { + protected static Supplier getBlobStoreCredentials() { if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) { /* To use this, must config credentials using "aws_access_key_id" as S3ID, * and "aws_secret_access_key" as S3Key. And bucket should exist in default region. e.g. * props.setProperty("S3ID", "AXXXXXXQ"); * props.setProperty("S3Key", "HXXXXXß"); */ - return new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key")); + return () -> new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key")); } else if (Boolean.parseBoolean(System.getProperty("testRealGCS", "false"))) { /* @@ -115,7 +116,7 @@ protected static Credentials getBlobStoreCredentials() { * props.setProperty("GCSID", "5XXXXXXXXXX6-compute@developer.gserviceaccount.com"); * props.setProperty("GCSKey", "XXXXXX"); */ - return new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey")); + return () -> new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey")); } else { return null; }