Skip to content

Commit

Permalink
Rewind ByteBuffers Read/Written to Function's state store (apache#7929)
Browse files Browse the repository at this point in the history
Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Aug 30, 2020
1 parent 1443c6b commit b06ea5e
Showing 1 changed file with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
*/
package org.apache.pulsar.functions.instance.state;

import static java.nio.charset.StandardCharsets.UTF_8;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.options.DeleteOption;
import org.apache.bookkeeper.api.kv.options.Option;
import org.apache.bookkeeper.api.kv.options.Options;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* This class accumulates the state updates from one function.
*
Expand All @@ -55,6 +53,10 @@ public CompletableFuture<Void> incrCounter(String key, long amount) {
@Override
public CompletableFuture<Void> put(String key, ByteBuffer value) {
if(value != null) {
// Set position to off the buffer to the beginning.
// If a user used an operation like ByteBuffer.allocate(4).putInt(count) to create a ByteBuffer to store to the state store
// the position of the buffer will be at the end and nothing will be written to table service
value.position(0);
return table.put(
Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
Unpooled.wrappedBuffer(value));
Expand All @@ -81,6 +83,10 @@ public CompletableFuture<ByteBuffer> get(String key) {
if (data != null) {
ByteBuffer result = ByteBuffer.allocate(data.readableBytes());
data.readBytes(result);
// Set position to off the buffer to the beginning, since the position after the read is going to be end of the buffer
// If we do not rewind to the begining here, users will have to explicitly do this in their function code
// in order to use any of the ByteBuffer operations
result.position(0);
return result;
}
return null;
Expand Down

0 comments on commit b06ea5e

Please sign in to comment.