Skip to content

Commit

Permalink
[TIEREDSTORAGE] Only seek when reading unexpected entry (apache#5356)
Browse files Browse the repository at this point in the history
* [TIEREDSTORAGE] Only seek when reading unexpected entry

The normal pattern from reading from an offloaded ledger is that the
reader will read the ledger sequentially from start to end. This means
that once a user reads an entry, we should expect that the next entry
they read will be the next entry in the ledger.

The initial implementation of the BlobStoreBackedReadHandleImpl (and
the S3 variant that preceeded it) didn't take this into
account. Instead it did a lookup in the index each time, to find the
block that contained the entry, and then read forward in the block
until it found the entry requested. This is fine for the first few
entries in the block, not so much for the last.

This PR changes the read behaviour to only seek if entryId read
from the block is either:
- greater than the entry we were expecting to read, in which case we
  need to seek backwards in the block.
- less than the entry expected, but also belonging to a different
  block to the expected entry, in which case we need to seek to the
  correct block.

This change improves read performance significantly. Adhoc benchmarks
shows that we can read from offloaded topics at ~160MB/s whereas
previously we could only manage <10MB/s.

* Revert it back to debug
  • Loading branch information
ivankelly authored and wolfstudy committed Oct 30, 2019
1 parent ebaf97c commit 43bc790
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public void seek(long position) {
log.debug("Seeking to {} on {}/{}, current position {}", position, bucket, key, cursor);
log.debug("Seeking to {} on {}/{}, current position {} (bufStart:{}, bufEnd:{})", position, bucket, key, cursor, bufferOffsetStart, bufferOffsetEnd);
if (position >= bufferOffsetStart && position <= bufferOffsetEnd) {
long newIndex = position - bufferOffsetStart;
buffer.readerIndex((int)newIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,11 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
long nextExpectedId = firstEntry;
try {
OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry);
inputStream.seek(entry.getDataOffset());

while (entriesToRead > 0) {
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
length = dataStream.readInt();
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
}
long entryId = dataStream.readLong();

Expand All @@ -126,6 +123,14 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
}
entriesToRead--;
nextExpectedId++;
} else if (entryId > nextExpectedId) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId < nextExpectedId
&& !index.getIndexEntryForEntry(nextExpectedId).equals(
index.getIndexEntryForEntry(entryId))) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,11 @@ public InputStream toStream() {
// true means the input stream will release the ByteBuf on close
return new ByteBufInputStream(out, true);
}

@Override
public String toString() {
return String.format("DataBlockHeader(len:%d,hlen:%d,firstEntry:%d)",
blockLength, headerLength, firstEntryId);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,11 @@ private OffloadIndexEntryImpl(long entryId, int partId, long offset, long blockH
this.offset = offset;
this.blockHeaderSize = blockHeaderSize;
}

@Override
public String toString() {
return String.format("[eid:%d, part:%d, offset:%d, doffset:%d]",
entryId, partId, offset, getDataOffset());
}
}

0 comments on commit 43bc790

Please sign in to comment.