Skip to content

Commit

Permalink
Fix panic on discard message for Go Functions (apache#8776)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolong.ran <[email protected]>


Fixes apache#8769

### Motivation

When we use pulsar-admin or pulsarctl to create Go Functions, we need to specify the input topic, but in many cases, we will directly specify the topic name, for example: `input-topic` instead of: `persistent://publlic/default/input-topic`.

In the consumer of Go Function, we have the following map structure to map the relationship between topic and consumer, as follows:

```
consumers         map[string]pulsar.Consumer
```

When assigning a value to the map structure, we use the value directly passed in from pulsar-admin or pulsarctl as the key of the map, but when using it, we use the full path form (eg:persistent:/ /publlic/default/input-topic), so the corresponding key cannot be found, resulting in the following panic:

```
2020/12/02 11:38:47.845  [info] Created producer cnx=127.0.0.1:54594 -> 127.0.0.1:6650 topic=persistent://public/default/out-topic producer_name=standalone-0-61 producerID=1
===============topic name================
topic name: in-topic
11:38:47.846 [pulsar-io-51-6] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:54594] Subscribing on topic persistent://public/default/in-topic / public/default/go_func
11:38:47.847 [pulsar-io-51-6] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/in-topic-public%2Fdefault%2Fgo_func] Rewind from 13:10 to 13:0
11:38:47.847 [pulsar-io-51-6] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/in-topic] There are no replicated subscriptions on the topic
11:38:47.847 [pulsar-io-51-6] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/in-topic][public/default/go_func] Created new subscription for 1
11:38:47.847 [pulsar-io-51-6] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:54594] Created subscription on topic persistent://public/default/in-topic / public/default/go_func
2020/12/02 11:38:47.847 asm_amd64.s:1373: [info] Connected consumer subscription=public/default/go_func consumerID=1 name=jrppw topic=persistent://public/default/in-topic
2020/12/02 11:38:47.847 asm_amd64.s:1373: [info] Created consumer name=jrppw topic=persistent://public/default/in-topic subscription=public/default/go_func consumerID=1
2020/12/02 11:38:47.847 log.go:46: [info] Serving InstanceCommunication on port 54013
2020/12/02 11:38:47.848 instance.go:402: [error] the logAppender is nil, if you want to use it, please specify `--log-topic` at startup.
2020/12/02 11:38:47.848 contextFunc.go:30: [info] attempting to discard input
++++++++++++++++++++
ack input msg topic: persistent://public/default/in-topic
++++++++++++++++++++++++
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x18 pc=0x46a1547]

goroutine 1 [running]:
github.com/apache/pulsar/pulsar-function-go/pf.(*goInstance).ackInputMessage(0xc000248000, 0x4a3d420, 0xc00040e000)
	/Users/wolf4j/github.com/apache/pulsar/pulsar-function-go/pf/instance.go:367 +0x1d7
github.com/apache/pulsar/pulsar-function-go/pf.(*goInstance).processResult(0xc000248000, 0x4a3d420, 0xc00040e000, 0x0, 0x0, 0x0)
	/Users/wolf4j/github.com/apache/pulsar/pulsar-function-go/pf/instance.go:355 +0x80
github.com/apache/pulsar/pulsar-function-go/pf.(*goInstance).startFunction(0xc000248000, 0x4a23740, 0xc00020e0c0, 0x0, 0x0)
	/Users/wolf4j/github.com/apache/pulsar/pulsar-function-go/pf/instance.go:177 +0x584
github.com/apache/pulsar/pulsar-function-go/pf.Start(0x4818b00, 0x4938eb0)
	/Users/wolf4j/github.com/apache/pulsar/pulsar-function-go/pf/function.go:171 +0x6b
main.main()
	/Users/wolf4j/github.com/apache/pulsar/pulsar-function-go/examples/contextFunc/contextFunc.go:35 +0x39
11:38:47.855 [pulsar-io-51-6] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:54594
```

### Modifications

In this pull request, we define a `TopicName` structure to parse the topic name passed in by `pulsar-admin` or `pulsarctl`. If it is the default, use the full path method for splicing to ensure that the map is used key is the same.
  • Loading branch information
wolfstudy authored Dec 3, 2020
1 parent b78b01c commit f9ad058
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 9 deletions.
1 change: 0 additions & 1 deletion pulsar-function-go/golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ linters:
enable:
- bodyclose
- deadcode
- gocritic
- goimports
- golint
- gosimple
Expand Down
23 changes: 15 additions & 8 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,22 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
channel := make(chan pulsar.ConsumerMessage)

var (
consumer pulsar.Consumer
err error
consumer pulsar.Consumer
topicName *TopicName
err error
)

for topic, consumerConf := range funcDetails.Source.InputSpecs {
log.Debugf("Setting up consumer for topic: %s with subscription name: %s", topic, subscriptionName)
topicName, err = ParseTopicName(topic)
if err != nil {
return nil, err
}

log.Debugf("Setting up consumer for topic: %s with subscription name: %s", topicName.Name, subscriptionName)
if consumerConf.ReceiverQueueSize != nil {
if consumerConf.IsRegexPattern {
consumer, err = gi.client.Subscribe(pulsar.ConsumerOptions{
TopicsPattern: topic,
TopicsPattern: topicName.Name,
ReceiverQueueSize: int(consumerConf.ReceiverQueueSize.Value),
SubscriptionName: subscriptionName,
Properties: properties,
Expand All @@ -273,7 +279,7 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
})
} else {
consumer, err = gi.client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
Topic: topicName.Name,
SubscriptionName: subscriptionName,
Properties: properties,
Type: subscriptionType,
Expand All @@ -284,15 +290,15 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
} else {
if consumerConf.IsRegexPattern {
consumer, err = gi.client.Subscribe(pulsar.ConsumerOptions{
TopicsPattern: topic,
TopicsPattern: topicName.Name,
SubscriptionName: subscriptionName,
Properties: properties,
Type: subscriptionType,
MessageChannel: channel,
})
} else {
consumer, err = gi.client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
Topic: topicName.Name,
SubscriptionName: subscriptionName,
Properties: properties,
Type: subscriptionType,
Expand All @@ -307,7 +313,7 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
gi.stats.incrTotalSysExceptions(err)
return nil, err
}
gi.consumers[topic] = consumer
gi.consumers[topicName.Name] = consumer
}
return channel, nil
}
Expand Down Expand Up @@ -358,6 +364,7 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) {

// ackInputMessage doesn't produce any result, or the user doesn't want the result.
func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) {
log.Debugf("ack input message topic name is: %s", inputMessage.Topic())
gi.consumers[inputMessage.Topic()].Ack(inputMessage)
}

Expand Down
123 changes: 123 additions & 0 deletions pulsar-function-go/pf/topicName.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package pf

import (
"errors"
"fmt"
"strconv"
"strings"
)

// TopicName abstract a struct contained in a Topic
type TopicName struct {
Domain string
Namespace string
Name string
Partition int
}

const (
publicTenant = "public"
defaultNamespace = "default"
partitionedTopicSuffix = "-partition-"
)

// ParseTopicName parse the given topic name and return TopicName.
func ParseTopicName(topic string) (*TopicName, error) {
// The topic name can be in two different forms, one is fully qualified topic name,
// the other one is short topic name
if !strings.Contains(topic, "://") {
// The short topic name can be:
// - <topic>
// - <tenant>/<namespace>/<topic>
// - <tenant>/<cluster>/<namespace>/<topic>
parts := strings.Split(topic, "/")
if len(parts) == 3 || len(parts) == 4 {
topic = "persistent://" + topic
} else if len(parts) == 1 {
topic = "persistent://" + publicTenant + "/" + defaultNamespace + "/" + parts[0]
} else {
return nil, errors.New(
"Invalid short topic name '" + topic +
"', it should be in the format of <tenant>/<namespace>/<topic> or <topic>")
}
}

tn := &TopicName{}

// The fully qualified topic name can be in two different forms:
// new: persistent://tenant/namespace/topic
// legacy: persistent://tenant/cluster/namespace/topic
parts := strings.SplitN(topic, "://", 2)
domain := parts[0]
if domain != "persistent" && domain != "non-persistent" {
return nil, errors.New("Invalid topic domain: " + domain)
}
tn.Domain = domain

rest := parts[1]
var err error

// The rest of the name can be in different forms:
// new: tenant/namespace/<localName>
// legacy: tenant/cluster/namespace/<localName>
// Examples of localName:
// 1. some/name/xyz//
// 2. /xyz-123/feeder-2
parts = strings.SplitN(rest, "/", 4)
if len(parts) == 3 {
// New topic name without cluster name
tn.Namespace = parts[0] + "/" + parts[1]
} else if len(parts) == 4 {
// Legacy topic name that includes cluster name
tn.Namespace = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[2])
} else {
return nil, errors.New("Invalid topic name: " + topic)
}

tn.Name = topic
tn.Partition, err = getPartitionIndex(topic)
if err != nil {
return nil, err
}

return tn, nil
}

// NameWithoutPartition returns the topic name, sans the partition portion
func (tn *TopicName) NameWithoutPartition() string {
if tn.Partition < 0 {
return tn.Name
}
idx := strings.LastIndex(tn.Name, partitionedTopicSuffix)
if idx > 0 {
return tn.Name[:idx]
}
return tn.Name
}

func getPartitionIndex(topic string) (int, error) {
if strings.Contains(topic, partitionedTopicSuffix) {
idx := strings.LastIndex(topic, "-") + 1
return strconv.Atoi(topic[idx:])
}
return -1, nil
}
83 changes: 83 additions & 0 deletions pulsar-function-go/pf/topicName_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package pf

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseTopicName(t *testing.T) {
testCases := []struct {
in string
name string
namespace string
partition int
}{
{in: "persistent://my-tenant/my-ns/my-topic",
name: "persistent://my-tenant/my-ns/my-topic",
namespace: "my-tenant/my-ns", partition: -1},
{in: "my-topic", name: "persistent://public/default/my-topic",
namespace: "public/default", partition: -1},
{in: "my-tenant/my-namespace/my-topic",
name: "persistent://my-tenant/my-namespace/my-topic",
namespace: "my-tenant/my-namespace", partition: -1},
{in: "non-persistent://my-tenant/my-namespace/my-topic",
name: "non-persistent://my-tenant/my-namespace/my-topic",
namespace: "my-tenant/my-namespace", partition: -1},
{in: "my-topic-partition-5",
name: "persistent://public/default/my-topic-partition-5",
namespace: "public/default", partition: 5},
// V1 topic name
{in: "persistent://my-tenant/my-cluster/my-ns/my-topic",
name: "persistent://my-tenant/my-cluster/my-ns/my-topic",
namespace: "my-tenant/my-cluster/my-ns", partition: -1},
{in: "my-tenant/my-cluster/my-ns/my-topic",
name: "persistent://my-tenant/my-cluster/my-ns/my-topic",
namespace: "my-tenant/my-cluster/my-ns", partition: -1},
}
for _, testCase := range testCases {
t.Run(testCase.in, func(t *testing.T) {
topic, err := ParseTopicName(testCase.in)
assert.Nil(t, err)
assert.Equal(t, testCase.name, topic.Name)
assert.Equal(t, testCase.namespace, topic.Namespace)
assert.Equal(t, testCase.partition, topic.Partition)
})
}
}

func TestParseTopicNameErrors(t *testing.T) {
testCases := []string{
"invalid://my-tenant/my-ns/my-topic",
"invalid://my-tenant/my-ns/my-topic-partition-xyz",
"my-tenant/my-ns/my-topic-partition-xyz/invalid",
"persistent://my-tenant",
"persistent://my-tenant/my-namespace",
"persistent://my-tenant/my-cluster/my-ns/my-topic-partition-xyz/invalid",
}
for _, testCase := range testCases {
t.Run(testCase, func(t *testing.T) {
_, err := ParseTopicName(testCase)
assert.NotNil(t, err)
})
}
}

0 comments on commit f9ad058

Please sign in to comment.