Skip to content

Commit

Permalink
[Issue 3926][functions] Add delete counter in python functions state_…
Browse files Browse the repository at this point in the history
…context (apache#3930)

* add delete_counter in python functions state_context

* Update contextimpl.py

Matched the name of the stub method to the one in ContextImpl
  • Loading branch information
laurent-chriqui authored and merlimat committed Mar 29, 2019
1 parent 07030e3 commit 08c97f7
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 2 deletions.
5 changes: 5 additions & 0 deletions pulsar-client-cpp/python/pulsar/functions/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ def get_counter(self, key):
"""get the counter of a given key in the managed state"""
pass

@abstractmethod
def del_counter(self, key):
"""delete the counter of a given key in the managed state"""
pass

@abstractmethod
def put_state(self, key, value):
"""update the value of a given key in the managed state"""
Expand Down
3 changes: 3 additions & 0 deletions pulsar-functions/instance/src/main/python/contextimpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ def incr_counter(self, key, amount):
def get_counter(self, key):
return self.state_context.get_amount(key)

def del_counter(self, key):
return self.state_context.delete(key)

def put_state(self, key, value):
return self.state_context.put(key, value)

Expand Down
7 changes: 5 additions & 2 deletions pulsar-functions/instance/src/main/python/state_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ class BKManagedStateContext(StateContext):

def __init__(self, state_storage_serviceurl, table_ns, table_name):
client_settings = StorageClientSettings(
service_uri=state_storage_serviceurl)
service_uri=state_storage_serviceurl)
admin_client = admin.client.Client(
storage_client_settings=client_settings)
storage_client_settings=client_settings)
# create namespace and table if needed
ns = admin_client.namespace(table_ns)
try:
Expand Down Expand Up @@ -154,3 +154,6 @@ def get_value(self, key):

def put(self, key, value):
return self.__table__.put_str(key, value)

def delete_key(self, key):
return self.__table__.delete_str(key)

0 comments on commit 08c97f7

Please sign in to comment.