Skip to content

Commit

Permalink
Issue 5624: Adding API to fetch current head of Byte Stream (pravega#…
Browse files Browse the repository at this point in the history
…6372)

* Issue 5624: Adding API to fetch current head of Byte Stream

Signed-off-by: SrishT <[email protected]>
  • Loading branch information
SrishT authored Oct 20, 2021
1 parent 2675a6f commit 0a80ee3
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ static ByteStreamClientFactory withScope(String scope, ClientConfig config) {
}

/**
* Creates a new ByteStreamReader on the specified stream initialized to offset 0.
* Creates a new ByteStreamReader on the specified stream initialized with the last offset which was passed to
* ByteStreamWriter::truncateDataBefore(offset), or 0 if truncateDataBefore has not ever been called on this stream.
*
* The first byte read from the return value of this method will be the first available byte in the stream,
* considering any possible truncation.
*
* @param streamName the stream to read from.
* @return A new ByteStreamReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public abstract class ByteStreamReader extends InputStream implements Asynchrono
@Override
public abstract int available();

/**
* This makes a synchronous RPC call to the server to obtain the current head of the stream.
* @return The current head offset
*/
public abstract long fetchHeadOffset();

/**
* This make an RPC to the server to fetch the offset at which new bytes would be written. This
* is the same as the length of the segment (assuming no truncation). This offset can also be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public abstract class ByteStreamWriter extends OutputStream {
*/
public abstract void closeAndSeal() throws IOException;

/**
* This makes a synchronous RPC call to the server to obtain the current head of the stream.
* @return The current head offset
*/
public abstract long fetchHeadOffset();

/**
* This makes a synchronous RPC call to the server to obtain the total number of bytes written
* to the segment in its history. This is the sum total of the bytes written in all calls to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public void closeAndSeal() throws IOException {
out.closeAndSeal();
}

@Override
public long fetchHeadOffset() {
return out.fetchHeadOffset();
}

@Override
public long fetchTailOffset() {
return out.fetchTailOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public void close() {
}
}

@Override
public long fetchHeadOffset() {
return Futures.getThrowingException(meta.fetchCurrentSegmentHeadOffset());
}


@Override
public long fetchTailOffset() {
return Futures.getThrowingException(meta.fetchCurrentSegmentLength());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public void closeAndSeal() throws IOException {
meta.close();
}

@Override
public long fetchHeadOffset() {
return Futures.getThrowingException(meta.fetchCurrentSegmentHeadOffset());
}

@Override
public long fetchTailOffset() {
return Futures.getThrowingException(meta.fetchCurrentSegmentLength());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ public interface SegmentMetadataClient extends AutoCloseable {
* @return a future containing the Metadata about the segment.
*/
abstract CompletableFuture<SegmentInfo> getSegmentInfo();

/**
* Returns the head of the current segment.
*
* @return a future containing the head of the current segment.
*/
abstract CompletableFuture<Long> fetchCurrentSegmentHeadOffset();

/**
* Returns the length of the current segment. i.e. the total length of all data written to the segment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ private CompletableFuture<SegmentSealed> sealSegmentAsync(Segment segment, Deleg
.thenApply(r -> transformReply(r, SegmentSealed.class));
}

@Override
public CompletableFuture<Long> fetchCurrentSegmentHeadOffset() {
Exceptions.checkNotClosed(closed.get(), this);
val result = RETRY_SCHEDULE.retryingOn(ConnectionFailedException.class)
.throwingOn(NoSuchSegmentException.class)
.runAsync(this::getStreamSegmentInfo, executor());
return result.thenApply(info -> info.getStartOffset());
}

@Override
public CompletableFuture<Long> fetchCurrentSegmentLength() {
Exceptions.checkNotClosed(closed.get(), this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,22 @@ public void testReadWritten() throws Exception {
@Cleanup
ByteStreamWriter writer = clientFactory.createByteStreamWriter(STREAM);
byte[] value = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
int headOffset = 0;
writer.write(value);
writer.flush();
@Cleanup
ByteStreamReader reader = clientFactory.createByteStreamReader(STREAM);
for (int i = 0; i < 10; i++) {
assertEquals(i, reader.read());
}
assertEquals(headOffset, reader.fetchHeadOffset());
assertEquals(value.length, reader.fetchTailOffset());
headOffset = 3;
writer.truncateDataBefore(headOffset);
writer.write(value);
writer.flush();
assertEquals(headOffset, reader.fetchHeadOffset());
assertEquals(value.length * 2, reader.fetchTailOffset());
byte[] read = new byte[5];
assertEquals(5, reader.read(read));
assertArrayEquals(new byte[] { 0, 1, 2, 3, 4 }, read);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;

import lombok.Cleanup;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -74,13 +75,18 @@ public void teardown() {
public void testWrite() throws Exception {
@Cleanup
ByteStreamWriter writer = clientFactory.createByteStreamWriter(STREAM);
byte[] value = new byte[] { 1, 2, 3, 4, 5 };
byte[] value = new byte[] { 1, 2, 3, 4, 5, 6, 7 };
int headoffset = 0;
writer.write(value);
writer.flush();
assertEquals(headoffset, writer.fetchHeadOffset());
assertEquals(value.length, writer.fetchTailOffset());
writer.write(value);
writer.write(value);
headoffset = 5;
writer.truncateDataBefore(headoffset);
writer.flush();
assertEquals(headoffset, writer.fetchHeadOffset());
assertEquals(value.length * 3, writer.fetchTailOffset());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
return null;
}
}).when(connection).send(any(WireCommands.GetStreamSegmentInfo.class));
long head = client.fetchCurrentSegmentHeadOffset().join();
long length = client.fetchCurrentSegmentLength().join();
assertEquals(121, head);
assertEquals(123, length);
}

Expand Down Expand Up @@ -278,7 +280,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
return null;
}
}).when(connection).send(any(WireCommands.GetStreamSegmentInfo.class));

long length = client.fetchCurrentSegmentLength().join();
InOrder order = Mockito.inOrder(connection, cf);
order.verify(cf).establishConnection(eq(endpoint), any(ReplyProcessor.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public long getOffset() {
return readOffset;
}

@Override
@Synchronized
public CompletableFuture<Long> fetchCurrentSegmentHeadOffset() {
return CompletableFuture.completedFuture(startingOffset);
}

@Override
@Synchronized
public CompletableFuture<Long> fetchCurrentSegmentLength() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ public void readWriteTestTruncate() throws IOException {
//Truncate data before offset 5
writer.truncateDataBefore(5);

// seek to offset 4 and verify if truncation is successful.
reader.seekToOffset(4);
// seek to an invalid truncated offset and verify if truncation is successful.
reader.seekToOffset(reader.fetchHeadOffset() - 1);
assertThrows(SegmentTruncatedException.class, reader::read);

// seek to offset 5 and verify if we are able to read the data.
// seek to the new head and verify if we are able to read the data.
byte[] data = new byte[]{5, 6, 7, 8, 9};
reader.seekToOffset(5);
reader.seekToOffset(reader.fetchHeadOffset());
byte[] readBuffer1 = new byte[5];
int bytesRead = reader.read(readBuffer1);
assertEquals(5, bytesRead);
Expand Down

0 comments on commit 0a80ee3

Please sign in to comment.