Skip to content

Commit

Permalink
Add async state manupulation methods in java functions (apache#7468)
Browse files Browse the repository at this point in the history
  • Loading branch information
Huanli-Meng authored Jul 14, 2020
1 parent cf6e28f commit b197962
Show file tree
Hide file tree
Showing 9 changed files with 634 additions and 21 deletions.
68 changes: 65 additions & 3 deletions site2/docs/functions-develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,14 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
void incrCounterAsync(String key, long amount);
long getCounter(String key);
long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
void putStateAsync(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
Expand Down Expand Up @@ -789,7 +793,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv

States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.

You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`, `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.

> Note
> State storage is not available in Go.
Expand All @@ -811,7 +815,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
void incrCounter(String key, long amount);
```

Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.

#### incrCounterAsync

```java
/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);
```

The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.

#### getCounter

Expand All @@ -825,11 +844,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
long getCounter(String key);
```

Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.

Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.

#### getCounterAsync

```java
/**
* Retrieve the counter value for the key, but don't wait
* for the operation to be completed
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
CompletableFuture<Long> getCounterAsync(String key);
```

The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.

#### putState

```java
Expand All @@ -842,6 +876,20 @@ general key/value state.
void putState(String key, ByteBuffer value);
```

#### putStateAsync

```java
/**
* Update the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @param value state value of the key
*/
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
```

The application can use `putStateAsync` to asynchronously update the state of a given `key`.

#### getState

```java
Expand All @@ -854,6 +902,20 @@ general key/value state.
ByteBuffer getState(String key);
```

#### getStateAsync

```java
/**
* Retrieve the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @return the state value for the key.
*/
CompletableFuture<ByteBuffer> getStateAsync(String key);
```

The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.

#### deleteState

```java
Expand Down
4 changes: 4 additions & 0 deletions site2/website/versioned_docs/version-2.4.0/functions-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,13 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
void incrCounterAsync(String key, long amount);
long getCounter(String key);
long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
void putStateAsync(String key, ByteBuffer value);
ByteBuffer getState(String key);
ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
Expand Down
177 changes: 177 additions & 0 deletions site2/website/versioned_docs/version-2.4.0/functions-state.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
---
id: version-2.4.0-functions-state
title: Pulsar Functions State Storage (Developer Preview)
sidebar_label: State Storage
original_id: functions-state
---

Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table service](https://docs.google.com/document/d/155xAwWv5IdOitHh1NVMEwCMGgB28M3FyMiQSxEpjE-Y/edit#heading=h.56rbh52koe3f)
for storing the `State` for functions. For example, A `WordCount` function can store its `counters` state into BookKeeper's table service via Pulsar Functions [State API](#api).

## API

### Java API

Currently Pulsar Functions expose following APIs for mutating and accessing State. These APIs are avaible in the [Context](functions-api.md#context) object when
you are using [Java SDK](functions-api.md#java-sdk-functions) functions.

#### incrCounter

```java
/**
* Increment the builtin distributed counter refered by key
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);
```

The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.

#### incrCounterAsync

```java
/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);
```

The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.

#### getCounter

```java
/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);
```

The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.

Besides the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.

#### getCounterAsync

```java
/**
* Retrieve the counter value for the key, but don't wait
* for the operation to be completed
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
CompletableFuture<Long> getCounterAsync(String key);
```

The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.

#### putState

```java
/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);
```

#### putStateAsync

```java
/**
* Update the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @param value state value of the key
*/
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
```

The application can use `putStateAsync` to asynchronously update the state of a given `key`.

#### getState

```
/**
* Retrieve the state value for the key.
*
* @param key name of the key
* @return the state value for the key.
*/
ByteBuffer getState(String key);
```

#### getStateAsync

```java
/**
* Retrieve the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @return the state value for the key.
*/
CompletableFuture<ByteBuffer> getStateAsync(String key);
```

The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.

### Python API

State currently is not supported at [Python SDK](functions-api.md#python-sdk-functions).

## Query State

A Pulsar Function can use the [State API](#api) for storing state into Pulsar's state storage
and retrieving state back from Pulsar's state storage. Additionally Pulsar also provides
CLI commands for querying its state.

```shell
$ bin/pulsar-admin functions querystate \
--tenant <tenant> \
--namespace <namespace> \
--name <function-name> \
--state-storage-url <bookkeeper-service-url> \
--key <state-key> \
[---watch]
```

If `--watch` is specified, the CLI will watch the value of the provided `state-key`.

## Example

### Java Example

{@inject: github:`WordCountFunction`:/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java} is a very good example
demonstrating on how Application can easily store `state` in Pulsar Functions.

```java
public class WordCountFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
return null;
}
}
```

The logic of this `WordCount` function is pretty simple and straightforward:

1. The function first splits the received `String` into multiple words using regex `\\.`.
2. For each `word`, the function increments the corresponding `counter` by 1 (via `incrCounter(key, amount)`).

### Python Example

State currently is not supported at [Python SDK](functions-api.md#python-sdk-functions).

Loading

0 comments on commit b197962

Please sign in to comment.