Skip to content

Commit

Permalink
Fix the read performance issue in the offload readAsync (apache#12443)
Browse files Browse the repository at this point in the history
---

*Motivation*

In the apache#12123, I add the seek operation at the readAsync method.
It makes sure the data stream always seek to the first entry position
to read and will not introduce EOF exception.
But in the offload index entry, it groups a set of entries into a range,
the seek operation will seek the posistion to the first entry in the range.
That will introduce a performance issue because every read opeartion will
read from the first entry in the range until it find the actual first read
entry.
But if we remove the seek operation, that will cause a EOF exception from
the readAsync method. This PR adds a limitation of the seek opeartion.

*Modifications*

Add available method in the backedInputStream to get know how many bytes
we can read from the stream.
  • Loading branch information
zymap authored Oct 22, 2021
1 parent d310e79 commit b4d05ac
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,9 @@ public void seekForward(long position) throws IOException {
public void close() {
buffer.release();
}

@Override
public int available() throws IOException {
return (int)(objectLen - cursor) + buffer.readableBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.submit(() -> {
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
boolean seeked = false;
try {
if (firstEntry > lastEntry
|| firstEntry < 0
Expand All @@ -115,14 +116,13 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
long entriesToRead = (lastEntry - firstEntry) + 1;
long nextExpectedId = firstEntry;

// seek the position to the first entry position, otherwise we will get the unexpected entry ID when doing
// the first read, that would cause read an unexpected entry id which is out of range between firstEntry
// and lastEntry
// for example, when we get 1-10 entries at first, then the next request is get 2-9, the following code
// will read the entry id from the stream and that is not the correct entry id, so it will seek to the
// correct position then read the stream as normal. But the entry id may exceed the last entry id, that
// will cause we are hardly to know the edge of the request range.
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
// checking the data stream has enough data to read to avoid throw EOF exception when reading data.
// 12 bytes represent the stream have the length and entryID to read.
if (dataStream.available() < 12) {
log.warn("There hasn't enough data to read, current available data has {} bytes,"
+ " seek to the first entry {} to avoid EOF exception", inputStream.available(), firstEntry);
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
}

while (entriesToRead > 0) {
if (state == State.Closed) {
Expand All @@ -149,14 +149,20 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
log.warn("The read entry {} is not the expected entry {} but in the range of {} - {},"
+ " seeking to the right position", entryId, nextExpectedId, nextExpectedId, lastEntry);
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId < nextExpectedId
&& !index.getIndexEntryForEntry(nextExpectedId).equals(index.getIndexEntryForEntry(entryId))) {
log.warn("Read an unexpected entry id {} which is smaller than the next expected entry id {}"
+ ", seeking to the right position", entries, nextExpectedId);
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId > lastEntry) {
// in the normal case, the entry id should increment in order. But if there has random access in
// the read method, we should allow to seek to the right position and the entry id should
// never over to the last entry again.
if (!seeked) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
seeked = true;
continue;
}
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,27 @@ public void testSeekForward() throws Exception {
toTest.seekForward(after);
assertStreamsMatch(toTest, toCompare);
}

@Test
public void testAvailable() throws IOException {
String objectKey = "testAvailable";
int objectSize = 2048;
RandomInputStream toWrite = new RandomInputStream(0, objectSize);
Payload payload = Payloads.newInputStreamPayload(toWrite);
payload.getContentMetadata().setContentLength((long)objectSize);
Blob blob = blobStore.blobBuilder(objectKey)
.payload(payload)
.contentLength(objectSize)
.build();
String ret = blobStore.putBlob(BUCKET, blob);
BackedInputStream bis = new BlobStoreBackedInputStreamImpl(
blobStore, BUCKET, objectKey, (k, md) -> {}, objectSize, 512);
Assert.assertEquals(bis.available(), objectSize);
bis.seek(500);
Assert.assertEquals(bis.available(), objectSize - 500);
bis.seek(1024);
Assert.assertEquals(bis.available(), 1024);
bis.seek(2048);
Assert.assertEquals(bis.available(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -477,4 +478,22 @@ public void testReadUnknownIndexVersion() throws Exception {
Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
}
}

@Test
public void testReadEOFException() throws Throwable {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
LedgerOffloader offloader = getOffloader();
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();

ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
toTest.readAsync(0, toTest.getLastAddConfirmed()).get();

try {
toTest.readAsync(0, 0).get();
} catch (Exception e) {
fail("Get unexpected exception when reading entries", e);
}
}
}

0 comments on commit b4d05ac

Please sign in to comment.