Skip to content

Commit

Permalink
[Issue 5618] [docs]Correct python example (apache#5625)
Browse files Browse the repository at this point in the history
* correct python example

* update two link issues

* remove the default constructor

* update content in version 2.4.1
  • Loading branch information
Jennifer88huang-zz authored and merlimat committed Nov 13, 2019
1 parent 3e7cb68 commit 9b9bff4
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 23 deletions.
23 changes: 10 additions & 13 deletions site2/docs/functions-develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ For complete code, see [here](https://github.com/apache/pulsar/blob/master/pulsa
>
> If you're running Pulsar Functions on an Ubuntu system that only supports python3, you might fail to
> start the functions. In this case, you can create a symlink. Your system will fail if
> you subsequently install any other package that depends on Python 2.x. A solution is under development in
> [Issue 5518](https://github.com/apache/pulsar/issues/5518).
> you subsequently install any other package that depends on Python 2.x. A solution is under development in [Issue 5518](https://github.com/apache/pulsar/issues/5518).
>
> ```bash
> sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10
Expand Down Expand Up @@ -235,20 +234,18 @@ In order to use this class in Pulsar Functions, you have two options:
2. You can create your own SerDe class. The following is an example.

```python
from pulsar import SerDe
from pulsar import SerDe

class TweetSerDe(SerDe):
def __init__(self, tweet):
self.tweet = tweet
class TweetSerDe(SerDe):

def serialize(self, input):
return bytes("{0}|{1}".format(self.tweet.username, self.tweet.tweet_content))
def serialize(self, input):
return bytes("{0}|{1}".format(input.username, input.tweet_content))

def deserialize(self, input_bytes):
tweet_components = str(input_bytes).split('|')
return Tweet(tweet_components[0], tweet_componentsp[1])
def deserialize(self, input_bytes):
tweet_components = str(input_bytes).split('|')
return Tweet(tweet_components[0], tweet_componentsp[1])
```

For complete code, see [here](https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/custom_object_function.py).

<!--END_DOCUSAURUS_CODE_TABS-->

Expand Down Expand Up @@ -717,7 +714,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv

States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.

You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](pulsar-admin.md#querystate) and [putstate](pulsar-admin.md#putstate) options to `pulsar-admin functions`.
You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.

### API

Expand Down
282 changes: 272 additions & 10 deletions site2/website/versioned_docs/version-2.4.1/functions-develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ def process(input):
```
For complete code, see [here](https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/native_exclamation_function.py).

> Note
> You can write Pulsar Functions in python2 or python3. However, Pulsar only looks for `python` as the interpreter.
>
> If you're running Pulsar Functions on an Ubuntu system that only supports python3, you might fail to start the functions. In this case, you can create a symlink. Your system will fail if you subsequently install any other package that depends on Python 2.x. A solution is under development in [Issue 5518](https://github.com/apache/pulsar/issues/5518).
>
> ```bash
> sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10
> ```
<!--END_DOCUSAURUS_CODE_TABS-->
The following example uses Pulsar Functions SDK.
Expand Down Expand Up @@ -224,20 +233,18 @@ In order to use this class in Pulsar Functions, you have two options:
2. You can create your own SerDe class. The following is an example.

```python
from pulsar import SerDe
from pulsar import SerDe

class TweetSerDe(SerDe):
def __init__(self, tweet):
self.tweet = tweet
class TweetSerDe(SerDe):

def serialize(self, input):
return bytes("{0}|{1}".format(self.tweet.username, self.tweet.tweet_content))
def serialize(self, input):
return bytes("{0}|{1}".format(input.username, input.tweet_content))

def deserialize(self, input_bytes):
tweet_components = str(input_bytes).split('|')
return Tweet(tweet_components[0], tweet_componentsp[1])
def deserialize(self, input_bytes):
tweet_components = str(input_bytes).split('|')
return Tweet(tweet_components[0], tweet_componentsp[1])
```

For complete code, see [here](https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/custom_object_function.py).

<!--END_DOCUSAURUS_CODE_TABS-->

Expand All @@ -248,6 +255,7 @@ Java, Python and Go SDKs provide access to a **context object** that can be used

* The name and ID of a Pulsar Function.
* The message ID of each message. Each Pulsar message is automatically assigned with an ID.
* The key, event time, properties and partition key of each message.
* The name of the topic to which the message is sent.
* The names of all input topics as well as the output topic associated with the function.
* The name of the class used for [SerDe](#serde).
Expand All @@ -258,6 +266,8 @@ Java, Python and Go SDKs provide access to a **context object** that can be used
* Access to arbitrary [user configuration](#user-config) values supplied via the CLI.
* An interface for recording [metrics](#metrics).
* An interface for storing and retrieving state in [state storage](#state-storage).
* A function to publish new messages onto arbitrary topics.
* A function to ack the message being processed (if auto-ack is disabled).

<!--DOCUSAURUS_CODE_TABS-->
<!--Java-->
Expand All @@ -279,6 +289,7 @@ public interface Context {
void incrCounter(String key, long amount);
long getCounter(String key);
void putState(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Expand Down Expand Up @@ -318,6 +329,70 @@ public class ContextFunction implements Function<String, Void> {
}
```

<!--Python-->
```
class ContextImpl(pulsar.Context):
def get_message_id(self):
...
def get_message_key(self):
...
def get_message_eventtime(self):
...
def get_message_properties(self):
...
def get_current_message_topic_name(self):
...
def get_partition_key(self):
...
def get_function_name(self):
...
def get_function_tenant(self):
...
def get_function_namespace(self):
...
def get_function_id(self):
...
def get_instance_id(self):
...
def get_function_version(self):
...
def get_logger(self):
...
def get_user_config_value(self, key):
...
def get_user_config_map(self):
...
def record_metric(self, metric_name, metric_value):
...
def get_input_topics(self):
...
def get_output_topic(self):
...
def get_output_serde_class_name(self):
...
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe",
properties=None, compression_type=None, callback=None, message_conf=None):
...
def ack(self, msgid, topic):
...
def get_and_reset_metrics(self):
...
def reset_metrics(self):
...
def get_metrics(self):
...
def incr_counter(self, key, amount):
...
def get_counter(self, key):
...
def del_counter(self, key):
...
def put_state(self, key, value):
...
def get_state(self, key):
...
```

<!--Go-->
```
func (c *FunctionContext) GetInstanceID() int {
Expand Down Expand Up @@ -635,3 +710,190 @@ To access metrics created by Pulsar Functions, refer to [Monitoring](deploy-moni
Pulsar Functions use [Apache BookKeeper](https://bookkeeper.apache.org) as a state storage interface. Pulsar installation, including the local standalone installation, includes deployment of BookKeeper bookies.

Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table service](https://docs.google.com/document/d/155xAwWv5IdOitHh1NVMEwCMGgB28M3FyMiQSxEpjE-Y/edit#heading=h.56rbh52koe3f) to store the `State` for functions. For example, a `WordCount` function can store its `counters` state into BookKeeper table service via Pulsar Functions State API.

States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.

You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.

### API

<!--DOCUSAURUS_CODE_TABS-->
<!--Java-->
Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the [Context](functions-develop.md#context) object when you are using Java SDK functions.

#### incrCounter

```java
/**
* Increment the builtin distributed counter refered by key
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);
```

Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.

#### getCounter

```java
/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);
```

Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.

Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.

#### putState

```java
/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);
```

#### getState

```java
/**
* Retrieve the state value for the key.
*
* @param key name of the key
* @return the state value for the key.
*/
ByteBuffer getState(String key);
```

#### deleteState

```java
/**
* Delete the state value for the key.
*
* @param key name of the key
*/
```

Counters and binary values share the same keyspace, so this deletes either type.

<!--Python-->
Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the [Context](#context) object when you are using Python SDK functions.

#### incr_counter

```python
def incr_counter(self, key, amount):
"""incr the counter of a given key in the managed state"""
```

Application can use `incr_counter` to change the counter of a given `key` by the given `amount`.
If the `key` does not exist, a new key is created.

#### get_counter

```python
def get_counter(self, key):
"""get the counter of a given key in the managed state"""
```

Application can use `get_counter` to retrieve the counter of a given `key` mutated by `incrCounter`.

Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.

#### put_state

```python
def put_state(self, key, value):
"""update the value of a given key in the managed state"""
```

The key is a string, and the value is arbitrary binary data.

#### get_state

```python
def get_state(self, key):
"""get the value of a given key in the managed state"""
```

#### del_counter

```python
def del_counter(self, key):
"""delete the counter of a given key in the managed state"""
```

Counters and binary values share the same keyspace, so this deletes either type.

<!--END_DOCUSAURUS_CODE_TABS-->

### Query State

A Pulsar Function can use the [State API](#api) for storing state into Pulsar's state storage
and retrieving state back from Pulsar's state storage. Additionally Pulsar also provides
CLI commands for querying its state.

```shell
$ bin/pulsar-admin functions querystate \
--tenant <tenant> \
--namespace <namespace> \
--name <function-name> \
--state-storage-url <bookkeeper-service-url> \
--key <state-key> \
[---watch]
```

If `--watch` is specified, the CLI will watch the value of the provided `state-key`.

### Example

<!--DOCUSAURUS_CODE_TABS-->
<!--Java-->

{@inject: github:`WordCountFunction`:/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java} is a very good example
demonstrating on how Application can easily store `state` in Pulsar Functions.

```java
public class WordCountFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
return null;
}
}
```

The logic of this `WordCount` function is pretty simple and straightforward:

1. The function first splits the received `String` into multiple words using regex `\\.`.
2. For each `word`, the function increments the corresponding `counter` by 1 (via `incrCounter(key, amount)`).

<!--Python-->

```python
from pulsar import Function

class WordCount(Function):
def process(self, item, context):
for word in item.split():
context.incr_counter(word, 1)
```

The logic of this `WordCount` function is pretty simple and straightforward:

1. The function first splits the received string into multiple words on space.
2. For each `word`, the function increments the corresponding `counter` by 1 (via `incr_counter(key, amount)`).

<!--END_DOCUSAURUS_CODE_TABS-->

0 comments on commit 9b9bff4

Please sign in to comment.