Skip to content

Commit

Permalink
[Go functions] access to user config before function loop (apache#8467)
Browse files Browse the repository at this point in the history
The pulsar go functions SDK has no access to user config outside of the function context, which is only accessible from inside the function handler itself (i.e. once processing has started). This is a somewhat awkward access method when it comes to user configuration values which might be used to initialize a persistent connection to a database or other service, since it would require a largely-superfluous check for the initialization and related configuration values in every run of the function, when it's only required once at startup.

This PR is a re-hash of apache#8365, which introduced secrets access incompatible to the concept of a secretsprovider, which will need to be addressed as separate parity functionality.
  • Loading branch information
flowchartsman authored Nov 19, 2020
1 parent 58b5d32 commit 2dbb9fb
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 10 deletions.
51 changes: 41 additions & 10 deletions pulsar-function-go/pf/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
)

// FunctionContext provides contextual information to the executing function.
// Features like which message id we are handling, whats the topic name of the
// message, what are our operating constraints, etc can be accessed by the
// executing function
type FunctionContext struct {
instanceConf *instanceConf
userConfigs map[string]interface{}
Expand All @@ -35,6 +39,7 @@ type FunctionContext struct {
record pulsar.Message
}

// NewFuncContext returns a new Function context
func NewFuncContext() *FunctionContext {
instanceConf := newInstanceConf()
userConfigs := buildUserConfig(instanceConf.funcDetails.GetUserConfig())
Expand All @@ -46,10 +51,14 @@ func NewFuncContext() *FunctionContext {
return fc
}

//GetInstanceID returns the id of the instance that invokes the running pulsar
//function.
func (c *FunctionContext) GetInstanceID() int {
return c.instanceConf.instanceID
}

//GetInputTopics returns a list of all input topics the pulsar function has been
//invoked on
func (c *FunctionContext) GetInputTopics() []string {
inputMap := c.instanceConf.funcDetails.GetSource().InputSpecs
inputTopics := make([]string, len(inputMap))
Expand All @@ -61,73 +70,96 @@ func (c *FunctionContext) GetInputTopics() []string {
return inputTopics
}

//GetOutputTopic returns the output topic the pulsar function was invoked on
func (c *FunctionContext) GetOutputTopic() string {
return c.instanceConf.funcDetails.GetSink().Topic
}

//GetTenantAndNamespace returns the tenant and namespace the pulsar function
//belongs to in the format of `<tenant>/<namespace>`
func (c *FunctionContext) GetTenantAndNamespace() string {
return c.GetFuncTenant() + "/" + c.GetFuncNamespace()
}

//GetTenantAndNamespaceAndName returns the full name of the pulsar function in
//the format of `<tenant>/<namespace>/<function name>`
func (c *FunctionContext) GetTenantAndNamespaceAndName() string {
return c.GetFuncTenant() + "/" + c.GetFuncNamespace() + "/" + c.GetFuncName()
}

//GetFuncTenant returns the tenant the pulsar function belongs to
func (c *FunctionContext) GetFuncTenant() string {
return c.instanceConf.funcDetails.Tenant
}

//GetFuncName returns the name given to the pulsar function
func (c *FunctionContext) GetFuncName() string {
return c.instanceConf.funcDetails.Name
}

//GetFuncNamespace returns the namespace the pulsar function belongs to
func (c *FunctionContext) GetFuncNamespace() string {
return c.instanceConf.funcDetails.Namespace
}

//GetFuncID returns the id of the pulsar function
func (c *FunctionContext) GetFuncID() string {
return c.instanceConf.funcID
}

//GetPort returns the port the pulsar function communicates on
func (c *FunctionContext) GetPort() int {
return c.instanceConf.port
}

//GetClusterName returns the name of the cluster the pulsar function is running
//in
func (c *FunctionContext) GetClusterName() string {
return c.instanceConf.clusterName
}

//GetExpectedHealthCheckInterval returns the expected time between health checks
//in seconds
func (c *FunctionContext) GetExpectedHealthCheckInterval() int32 {
return c.instanceConf.expectedHealthCheckInterval
}

//GetExpectedHealthCheckIntervalAsDuration returns the expected time between
//health checks in seconds as a time.Duration
func (c *FunctionContext) GetExpectedHealthCheckIntervalAsDuration() time.Duration {
return time.Duration(c.instanceConf.expectedHealthCheckInterval)
}

//GetMaxIdleTime returns the amount of time the pulsar function has to respond
//to the most recent health check before it is considered to be failing.
func (c *FunctionContext) GetMaxIdleTime() int64 {
return int64(c.GetExpectedHealthCheckIntervalAsDuration() * 3 * time.Second)
}

//GetFuncVersion returns the version of the pulsar function
func (c *FunctionContext) GetFuncVersion() string {
return c.instanceConf.funcVersion
}

//GetUserConfValue returns the value of a key from the pulsar function's user
//configuration map
func (c *FunctionContext) GetUserConfValue(key string) interface{} {
return c.userConfigs[key]
}

//GetUserConfMap returns the pulsar function's user configuration map
func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
return c.userConfigs
}

// NewOutputMessage send message to the topic
// @param topicName: The name of the topic for output message
// NewOutputMessage send message to the topic @param topicName: The name of the
// topic for output message
func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer {
return c.outputMessage(topicName)
}

// SetCurrentRecord sets the current message into the function context
// called for each message before executing a handler function
// SetCurrentRecord sets the current message into the function context called
// for each message before executing a handler function
func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
c.record = record
}
Expand All @@ -138,14 +170,13 @@ func (c *FunctionContext) GetCurrentRecord() pulsar.Message {

}

// An unexported type to be used as the key for types in this package.
// This prevents collisions with keys defined in other packages.
// An unexported type to be used as the key for types in this package. This
// prevents collisions with keys defined in other packages.
type key struct{}

// contextKey is the key for FunctionContext values in context.Context.
// It is unexported;
// clients should use FunctionContext.NewContext and FunctionContext.FromContext
// instead of using this key directly.
// contextKey is the key for FunctionContext values in context.Context. It is
// unexported; clients should use FunctionContext.NewContext and
// FunctionContext.FromContext instead of using this key directly.
var contextKey = &key{}

// NewContext returns a new Context that carries value u.
Expand Down
12 changes: 12 additions & 0 deletions pulsar-function-go/pf/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,15 @@ func Start(funcName interface{}) {
panic("start function failed, please check.")
}
}

// GetUserConfMap provides a means to access the pulsar function's user config
// map before initializing the pulsar function
func GetUserConfMap() map[string]interface{} {
return NewFuncContext().userConfigs
}

// GetUserConfValue provides acces to a user configuration value before
// initializing the pulsar function
func GetUserConfValue(key string) interface{} {
return NewFuncContext().userConfigs[key]
}

0 comments on commit 2dbb9fb

Please sign in to comment.