Skip to content

Commit

Permalink
[component/functions|component/go] Add access to the current message …
Browse files Browse the repository at this point in the history
…from the function context (apache#8290)

### Motivation

It may be helpful to access the current message in order to access its metadata (key, properties, etc).

### Modifications

- Implemented `FunctionContext#GetCurrentRecord` (similar to [Context#getCurrentRecord](https://github.com/apache/pulsar/blob/9c821cf940d422d2ea3d47687bd309fe698ab84e/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java#L46))
- Implemented `FunctionContext#SetCurrentRecord`
- Set current message before executing a message handler in `goInstance#handleMsg`

### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
vaihtovirta authored Oct 21, 2020
1 parent 3147e9e commit 7462324
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 19 deletions.
1 change: 1 addition & 0 deletions pulsar-function-go/examples/contextFunc/contextFunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func contextFunc(ctx context.Context) {
if fc, ok := pf.FromContext(ctx); ok {
fmt.Printf("function ID is:%s, ", fc.GetFuncID())
fmt.Printf("function version is:%s\n", fc.GetFuncVersion())
fmt.Printf("Current message's ID :%s\n", string(fc.GetCurrentRecord().ID().Serialize()))
}
}

Expand Down
60 changes: 41 additions & 19 deletions pulsar-function-go/go.sum

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions pulsar-function-go/pf/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"context"
"encoding/json"
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

type FunctionContext struct {
instanceConf *instanceConf
userConfigs map[string]interface{}
logAppender *LogAppender
record pulsar.Message
}

func NewFuncContext() *FunctionContext {
Expand Down Expand Up @@ -116,6 +119,17 @@ func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
return c.userConfigs
}

// SetCurrentRecord sets the current message into the function context
// called for each message before executing a handler function
func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
c.record = record
}

// GetCurrentRecord gets the current message from the function context
func (c *FunctionContext) GetCurrentRecord() pulsar.Message {
return c.record
}

// An unexported type to be used as the key for types in this package.
// This prevents collisions with keys defined in other packages.
type key struct{}
Expand Down
13 changes: 13 additions & 0 deletions pulsar-function-go/pf/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func TestContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fc := NewFuncContext()
fc.record = &MockMessage{
properties: map[string]string{"FOO": "BAR"},
messageID: &MockMessageID{},
}
ctx = NewContext(ctx, fc)

if resfc, ok := FromContext(ctx); ok {
Expand All @@ -54,5 +58,14 @@ func TestContext(t *testing.T) {
t,
map[string]interface{}{"word-of-the-day": "hapax legomenon"},
resfc.GetUserConfMap())
assert.IsType(t, &MockMessage{}, fc.GetCurrentRecord())
}
}

func TestFunctionContext_setCurrentRecord(t *testing.T) {
fc := NewFuncContext()

fc.SetCurrentRecord(&MockMessage{})

assert.IsType(t, &MockMessage{}, fc.record)
}
3 changes: 3 additions & 0 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
func (gi *goInstance) handlerMsg(input pulsar.Message) (output []byte, err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

gi.context.SetCurrentRecord(input)

ctx = NewContext(ctx, gi.context)
msgInput := input.Payload()
return gi.function.process(ctx, msgInput)
Expand Down
23 changes: 23 additions & 0 deletions pulsar-function-go/pf/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package pf

import (
"context"
"fmt"
"strconv"
"testing"
Expand Down Expand Up @@ -92,3 +93,25 @@ func TestTime_EqualsThreeSecondsTimed(t *testing.T) {
assert.True(t, time.Duration(diff) > time.Second*3)
assert.True(t, time.Duration(diff) < time.Millisecond*3100)
}

type MockHandler struct{}

func (m *MockHandler) process(ctx context.Context, input []byte) ([]byte, error) {
return []byte(`output`), nil
}

func Test_goInstance_handlerMsg(t *testing.T) {
handler := &MockHandler{}
fc := NewFuncContext()
instance := &goInstance{
function: handler,
context: fc,
}
message := &MockMessage{payload: []byte(`{}`)}

output, err := instance.handlerMsg(message)

assert.Nil(t, err)
assert.Equal(t, "output", string(output))
assert.Equal(t, message, fc.record)
}
82 changes: 82 additions & 0 deletions pulsar-function-go/pf/mockMessage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package pf

import (
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

type MockMessage struct {
properties map[string]string
messageID *MockMessageID
payload []byte
}

func (m *MockMessage) Topic() string {
return ""
}

func (m *MockMessage) ProducerName() string {
return "mock-producer"
}

func (m *MockMessage) Properties() map[string]string {
return m.properties
}

func (m *MockMessage) Payload() []byte {
return m.payload
}

func (m *MockMessage) ID() pulsar.MessageID {
return m.messageID
}

func (m *MockMessage) PublishTime() time.Time {
return time.Now()
}

func (m *MockMessage) EventTime() time.Time {
return time.Now()
}

func (m *MockMessage) Key() string {
return "key"
}

func (m *MockMessage) RedeliveryCount() uint32 {
return 1
}

func (m *MockMessage) IsReplicated() bool {
return true
}

func (m *MockMessage) GetReplicatedFrom() string {
return "mock-cluster"
}

type MockMessageID struct{}

func (m *MockMessageID) Serialize() []byte {
return []byte(`message-id`)
}
8 changes: 8 additions & 0 deletions site2/docs/functions-develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,14 @@ func (c *FunctionContext) GetUserConfValue(key string) interface{} {
func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
return c.userConfigs
}
func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
c.record = record
}
func (c *FunctionContext) GetCurrentRecord() pulsar.Message {
return c.record
}
```

The following example uses several methods available via the `Context` object.
Expand Down

0 comments on commit 7462324

Please sign in to comment.