Skip to content

Commit

Permalink
Add tenant and namespace getters to Python context (apache#1677)
Browse files Browse the repository at this point in the history
* add tenant and namespace getters on Python context

* switch to getInputTopicName

* re-name current message methods
  • Loading branch information
lucperkins authored and merlimat committed Apr 30, 2018
1 parent 4def373 commit d5780ef
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
17 changes: 13 additions & 4 deletions pulsar-client-cpp/python/functions/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,18 @@ def get_message_id(self):
pass

@abstractmethod
def get_topic_name(self):
def get_current_message_topic_name(self):
"""Returns the topic name of the message that we are processing"""
pass

@abstractmethod
def get_function_tenant(self):
"""Returns the tenant of the message that's being processed"""
pass

@abstractmethod
def get_function_namespace(self):
"""Returns the namespace of the message that's being processed"""

@abstractmethod
def get_function_name(self):
Expand Down Expand Up @@ -106,15 +115,15 @@ def publish(self, topic_name, message):

@abstractmethod
def get_output_topic(self):
'''Returns the output topic of function'''
"""Returns the output topic of function"""
pass

@abstractmethod
def get_output_serde_class_name(self):
'''return output Serde class'''
"""return output Serde class"""
pass

@abstractmethod
def ack(self, msgid, topic):
'''ack this message id'''
"""ack this message id"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public interface Context {
byte[] getMessageId();

/**
* The topic that this message belongs to
* @return The topic name
* The input topic that the message currently being processed belongs to
* @return The input topic name
*/
String getTopicName();
String getCurrentMessageTopicName();

/**
* Get a list of all input topics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public byte[] getMessageId() {
}

@Override
public String getTopicName() {
public String getCurrentMessageTopicName() {
return currentTopicName;
}

Expand Down
12 changes: 9 additions & 3 deletions pulsar-functions/instance/src/main/python/contextimpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,30 @@ def __init__(self, instance_config, logger, pulsar_client, user_code, consumers)
self.publish_producers = {}
self.publish_serializers = {}
self.current_message_id = None
self.current_topic_name = None
self.current_input_topic_name = None
self.current_start_time = None

# Called on a per message basis to set the context for the current message
def set_current_message_context(self, msgid, topic):
self.current_message_id = msgid
self.current_topic_name = topic
self.current_input_topic_name = topic
self.current_start_time = time.time()

def get_message_id(self):
return self.current_message_id

def get_topic_name(self):
def get_current_message_topic_name(self):
return self.current_topic_name

def get_function_name(self):
return self.instance_config.function_details.name

def get_function_tenant(self):
return self.instance_config.function_details.tenant

def get_function_namespace(self):
return self.instance_config.function_details.namespace

def get_function_id(self):
return self.instance_config.function_id

Expand Down

0 comments on commit d5780ef

Please sign in to comment.