Skip to content

Commit

Permalink
[issue apache#3767] support go function for pulsar (apache#3854)
Browse files Browse the repository at this point in the history
Master Issue: apache#3767 

### Motivation

At present, go function only supports the simplest function. Input and output only allow []byte. For details, refer to: [PIP32](https://github.com/apache/pulsar/wiki/PIP-32%3A-Go-Function-API%2C-Instance-and-LocalRun)
  • Loading branch information
wolfstudy authored and sijie committed Apr 9, 2019
1 parent 12de91f commit f229d3b
Show file tree
Hide file tree
Showing 26 changed files with 3,427 additions and 10 deletions.
Empty file added pulsar-function-go/README.md
Empty file.
109 changes: 109 additions & 0 deletions pulsar-function-go/conf/conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package conf

import (
"flag"
"io/ioutil"
"os"
"os/user"
"time"

"github.com/apache/pulsar/pulsar-function-go/log"
"gopkg.in/yaml.v2"
)

const ConfigPath = "github.com/apache/pulsar/pulsar-function-go/conf/conf.yaml"

type Conf struct {
PulsarServiceURL string `yaml:"pulsarServiceURL"`
InstanceID int `yaml:"instanceID"`
FuncID string `yaml:"funcID"`
FuncVersion string `yaml:"funcVersion"`
MaxBufTuples int `yaml:"maxBufTuples"`
Port int `yaml:"port"`
ClusterName string `yaml:"clusterName"`
KillAfterIdleMs time.Duration `yaml:"killAfterIdleMs"`
// function details config
Tenant string `yaml:"tenant"`
NameSpace string `yaml:"nameSpace"`
Name string `yaml:"name"`
LogTopic string `yaml:"logTopic"`
ProcessingGuarantees int32 `yaml:"processingGuarantees"`
SecretsMap string `yaml:"secretsMap"`
Runtime int32 `yaml:"runtime"`
AutoACK bool `yaml:"autoAck"`
Parallelism int32 `yaml:"parallelism"`
//source config
SubscriptionType int32 `yaml:"subscriptionType"`
TimeoutMs uint64 `yaml:"timeoutMs"`
SubscriptionName string `yaml:"subscriptionName"`
CleanupSubscription bool `yaml:"cleanupSubscription"`
//source input specs
SourceSpecTopic string `yaml:"sourceSpecsTopic"`
SourceSchemaType string `yaml:"sourceSchemaType"`
IsRegexPatternSubscription bool `yaml:"isRegexPatternSubscription"`
ReceiverQueueSize int32 `yaml:"receiverQueueSize"`
//sink spec config
SinkSpecTopic string `yaml:"sinkSpecsTopic"`
SinkSchemaType string `yaml:"sinkSchemaType"`
//resources config
Cpu float64 `yaml:"cpu"`
Ram int64 `yaml:"ram"`
Disk int64 `yaml:"disk"`
//retryDetails config
MaxMessageRetries int32 `yaml:"maxMessageRetries"`
DeadLetterTopic string `yaml:"deadLetterTopic"`
}

var opts string

func (c *Conf) GetConf() *Conf {
flag.Parse()

yamlFile, err := ioutil.ReadFile(opts)
if err != nil {
log.Errorf("not found conf file, err:%s", err.Error())
return nil
}
err = yaml.Unmarshal(yamlFile, c)
if err != nil {
log.Errorf("unmarshal yaml file error:%s", err.Error())
return nil
}
return c
}

func init() {
var homeDir string
usr, err := user.Current()
if err == nil {
homeDir = usr.HomeDir
}

// Fall back to standard HOME environment variable that works
// for most POSIX OSes if the directory from the Go standard
// lib failed.
if err != nil || homeDir == "" {
homeDir = os.Getenv("HOME")
}
defaultPath := homeDir + "/" + ConfigPath
flag.StringVar(&opts, "instance-conf", defaultPath, "config conf.yml filepath")
}
57 changes: 57 additions & 0 deletions pulsar-function-go/conf/conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

pulsarServiceURL: "pulsar://localhost:6650"
instanceID: 101
funcID: "pulsar-function"
funcVersion: "1.0.0"
maxBufTuples: 10
port: 8091
clusterName: "pulsar-function-go"
killAfterIdleMs: 50000
# function details config
tenant: ""
nameSpace: ""
name: "go-function"
logTopic: ""
processingGuarantees: 0
secretsMap: ""
runtime: 0
autoAck: true
parallelism: 0
# source config
subscriptionType: 0
timeoutMs: 0
subscriptionName: ""
cleanupSubscription: false
# source input specs
sourceSpecsTopic: persistent://public/default/topic-01
sourceSchemaType: ""
isRegexPatternSubscription: false
receiverQueueSize: 10
# sink specs config
sinkSpecsTopic: persistent://public/default/topic-02
sinkSchemaType: ""
# resource config
cpu: 0
ram: 0
disk: 0
# retryDetails config
maxMessageRetries: 0
deadLetterTopic: ""
38 changes: 38 additions & 0 deletions pulsar-function-go/examples/contextFunc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package main

import (
"context"
"fmt"

"github.com/apache/pulsar/pulsar-function-go/pf"
)

func contextFunc(ctx context.Context) {
if fc, ok := pf.FromContext(ctx); ok {
fmt.Printf("function ID is:%s, ", fc.GetFuncID())
fmt.Printf("function version is:%s\n", fc.GetFuncVersion())
}
}

func main() {
pf.Start(contextFunc)
}
34 changes: 34 additions & 0 deletions pulsar-function-go/examples/hello.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package main

import (
"fmt"

"github.com/apache/pulsar/pulsar-function-go/pf"
)

func hello() {
fmt.Println("hello pulsar function")
}

func main() {
pf.Start(hello)
}
36 changes: 36 additions & 0 deletions pulsar-function-go/examples/inputFunc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package main

import (
"context"
"fmt"

"github.com/apache/pulsar/pulsar-function-go/pf"
)

func HandleRequest(ctx context.Context, in []byte) error{
fmt.Println(string(in) + "!")
return nil
}

func main() {
pf.Start(HandleRequest)
}
35 changes: 35 additions & 0 deletions pulsar-function-go/examples/outputFunc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package main

import (
"context"

"github.com/apache/pulsar/pulsar-function-go/pf"
)

func HandleResponse(ctx context.Context, in []byte) ([]byte, error) {
res := append(in, 110)
return res, nil
}

func main() {
pf.Start(HandleResponse)
}
61 changes: 61 additions & 0 deletions pulsar-function-go/examples/test/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package main

import (
"context"
"fmt"
"log"

"github.com/apache/pulsar/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}

defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-02",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}

defer consumer.Close()

for {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

fmt.Printf("Received message msgId: %s -- content: '%s'\n",
msg.ID(), string(msg.Payload()))

consumer.Ack(msg)
}
}

Loading

0 comments on commit f229d3b

Please sign in to comment.