States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual function and shared between instances of that function.
Pulsar Functions use StateStoreProvider
to initialize a StateStore
to manage state, so it can support multiple state storage backend, such as:
BKStateStoreProviderImpl
: use Apache BookKeeper as the backendPulsarMetadataStateStoreProviderImpl
: use Pulsar Metadata as the backend
Users can also implement their own StateStoreProvider
to support other state storage backend.
The Broker also exposes two endpoints to put and query a state key of a function:
- GET /{tenant}/{namespace}/{functionName}/state/{key}
- POST /{tenant}/{namespace}/{functionName}/state/{key}
Although Pulsar Function supports multiple state storage backend, these two endpoints are still using BookKeeper's StorageAdminClient
directly to put and query state,
this makes the Pulsar Functions' state store highly coupled with Apache BookKeeper.
See: code
This proposal aims to decouple Pulsar Functions' state store from Apache BookKeeper, so it can support other state storage backend.
- Pulsar Functions can use other state storage backend other than Apache BookKeeper.
None
- Replace the
StorageAdminClient
inComponentImpl
withStateStoreProvider
to manage state. - Add a
cleanup
method to theStateStoreProvider
interface
-
In the
ComponentImpl#getFunctionState
andComponentImpl#queryState
methods, replace theStorageAdminClient
withStateStoreProvider
:String tableNs = getStateNamespace(tenant, namespace); String tableName = functionName; String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl(); if (storageClient.get() == null) { storageClient.compareAndSet(null, StorageClientBuilder.newBuilder() .withSettings(StorageClientSettings.newBuilder() .serviceUri(stateStorageServiceUrl) .clientName("functions-admin") .build()) .withNamespace(tableNs) .build()); } ...
Replaced to:
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, name);
-
Add a
cleanup
method to theStateStoreProvider
interface:default void cleanUp(String tenant, String namespace, String name) throws Exception;
Because when delete a function, the related state store should also be deleted. Currently, it's also using BookKeeper's
StorageAdminClient
to delete the state store table:deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName); private void deleteStatestoreTableAsync(String namespace, String table) { StorageAdminClient adminClient = worker().getStateStoreAdminClient(); if (adminClient != null) { adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> { if ((throwable == null && res) || ((throwable instanceof NamespaceNotFoundException || throwable instanceof StreamNotFoundException))) { log.info("{}/{} table deleted successfully", namespace, table); } else { if (throwable != null) { log.error("{}/{} table deletion failed {} but moving on", namespace, table, throwable); } else { log.error("{}/{} table deletion failed but moving on", namespace, table); } } }); } }
So this proposal will add a
cleanup
method to theStateStoreProvider
and call it after a function is deleted:worker().getStateStoreProvider().cleanUp(tenant, namespace, hashName);
-
Add a new
init
method toStateStoreProvider
interface:The current
init
method requires aFunctionDetails
parameter, but we cannot get theFunctionDetails
in theComponentImpl
class, and this parameter is not used either inBKStateStoreProviderImpl
or inPulsarMetadataStateStoreProviderImpl
, but for backward compatibility, instead of updating theinit
method, this proposal will add a newinit
method withoutFunctionDetails
parameter:default void init(Map<String, Object> config) throws Exception {}
None
- Nothing needs to be done if users use the Apache BookKeeper as the state storage backend.
- If users use another state storage backend, they need to change it back to BookKeeper.
Nothing needs to be done.
- Mailing List discussion thread: https://lists.apache.org/thread/0rz29wotonmdck76pdscwbqo19t3rbds
- Mailing List voting thread: https://lists.apache.org/thread/t8vmyxovrrb5xl8jvrp1om50l6nprdjt