Skip to content

Commit

Permalink
fix go server standalone connector unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
horoc committed Dec 12, 2022
1 parent 0a7c8c1 commit 206a5b0
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 33 deletions.
34 changes: 23 additions & 11 deletions eventmesh-server-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ module github.com/apache/incubator-eventmesh/eventmesh-server-go
go 1.18

require (
github.com/apache/rocketmq-client-go/v2 v2.1.1
github.com/cloudevents/sdk-go/v2 v2.11.0
github.com/deckarep/golang-set/v2 v2.1.0
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.8.1
github.com/gogf/gf v1.16.9
github.com/golang/mock v1.6.0
github.com/google/uuid v1.1.2
github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.12
github.com/lestrrat-go/strftime v1.0.6
Expand All @@ -28,25 +34,26 @@ require (
github.com/panjf2000/ants/v2 v2.6.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
go.uber.org/zap v1.22.0
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
google.golang.org/grpc v1.36.1
gopkg.in/yaml.v3 v3.0.1
github.com/apache/rocketmq-client-go/v2 v2.1.1
github.com/deckarep/golang-set/v2 v2.1.0
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.8.1
github.com/golang/mock v1.6.0
github.com/google/uuid v1.1.2
github.com/stretchr/testify v1.8.0
github.com/unrolled/secure v1.12.0
go.uber.org/atomic v1.7.0
go.uber.org/fx v1.18.1
go.uber.org/zap v1.22.0
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
google.golang.org/grpc v1.36.1
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emirpasic/gods v1.12.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
Expand All @@ -57,8 +64,10 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
Expand All @@ -75,8 +84,11 @@ require (
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
gopkg.in/ini.v1 v1.42.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
13 changes: 3 additions & 10 deletions eventmesh-server-go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -324,7 +323,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/ugorji/go v1.2.7 h1:qYhyWUUd6WbiM+C6JZAUkIJt/1WrjzNHY9+KCIjVqTo=
github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M=
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
Expand Down Expand Up @@ -434,8 +432,6 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -453,8 +449,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -501,8 +497,6 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956 h1:XeJjHH1KiLpKGb6lvMiksZ9l0fVUh+AmGcm0nOMEBOY=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -612,7 +606,6 @@ google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package standalone

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -35,18 +36,29 @@ const (
pluginName = "standalone"
)

// MockDecoder standalone connector properties mock decoder
type MockDecoder struct {
}

// Decode mock decoder, no-op
func (m *MockDecoder) Decode(cfg interface{}) error {
return nil
}

func TestProducer_Publish(t *testing.T) {
factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
factory.Setup(pluginName, &MockDecoder{})
producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()

var publishSuccess bool
var callBackErr error
topic := fmt.Sprintf("%s_publish", topicName)
callback := connector.SendCallback{
OnSuccess: func(result *connector.SendResult) {
publishSuccess = true
assert.Equal(t, topicName, result.Topic)
assert.Equal(t, topic, result.Topic)
assert.Equal(t, "1", result.MessageId)
assert.Nil(t, result.Err)
},
Expand All @@ -55,18 +67,20 @@ func TestProducer_Publish(t *testing.T) {
},
}

err := producer.Publish(context.Background(), getTestEvent(), &callback)
err := producer.Publish(context.Background(), getTestEvent(topic), &callback)
assert.Nil(t, err)
assert.True(t, publishSuccess)
assert.Nil(t, callBackErr)

exist, err := producer.CheckTopicExist(topicName)
exist, err := producer.CheckTopicExist(topic)
assert.True(t, exist)
assert.Nil(t, err)

}
func TestConsumer_Subscribe(t *testing.T) {
sum := atomic.NewInt64(0)
topic := fmt.Sprintf("%s_subscribe", topicName)

var wg sync.WaitGroup
wg.Add(50)

Expand All @@ -83,17 +97,18 @@ func TestConsumer_Subscribe(t *testing.T) {
}

factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
factory.Setup(pluginName, &MockDecoder{})
consumer, _ := factory.GetConsumer()
consumer.Start()
consumer.RegisterEventListener(&listener)
consumer.Subscribe(topicName)
consumer.Subscribe(topic)
defer consumer.Shutdown()

producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
for i := 1; i <= 50; i++ {
err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
err := producer.Publish(context.Background(), getTestEventOfData(topic, map[string]interface{}{
"val": i,
}), getEmptyPublishCallback())

Expand All @@ -109,6 +124,8 @@ func TestConsumer_Subscribe(t *testing.T) {

func TestConsumer_ManualAck(t *testing.T) {
sum := atomic.NewInt64(0)
topic := fmt.Sprintf("%s_ack", topicName)

var wg sync.WaitGroup
wg.Add(50)

Expand All @@ -126,17 +143,18 @@ func TestConsumer_ManualAck(t *testing.T) {
}

factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
factory.Setup(pluginName, &MockDecoder{})
consumer, _ := factory.GetConsumer()
consumer.Start()
consumer.RegisterEventListener(&listener)
consumer.Subscribe(topicName)
consumer.Subscribe(topic)
defer consumer.Shutdown()

producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
for i := 1; i <= 50; i++ {
err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
err := producer.Publish(context.Background(), getTestEventOfData(topic, map[string]interface{}{
"val": i,
}), getEmptyPublishCallback())

Expand All @@ -151,6 +169,7 @@ func TestConsumer_ManualAck(t *testing.T) {

func TestConsumer_UpdateOffset(t *testing.T) {
sum := atomic.NewInt64(0)
topic := fmt.Sprintf("%s_offset", topicName)
ch := make(chan struct{})
listener := connector.EventListener{
Consume: func(event *ce.Event, commitFunc connector.CommitFunc) error {
Expand All @@ -164,20 +183,21 @@ func TestConsumer_UpdateOffset(t *testing.T) {
}

factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
factory.Setup(pluginName, &MockDecoder{})
consumer, _ := factory.GetConsumer()
consumer.Start()
defer consumer.Shutdown()
consumer.RegisterEventListener(&listener)
event := getTestEvent()
event := getTestEvent(topic)
event.SetExtension("offset", "49")
consumer.Subscribe(topicName)
consumer.Subscribe(topic)
consumer.UpdateOffset(context.Background(), []*ce.Event{event})

producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
for i := 1; i <= 50; i++ {
err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
err := producer.Publish(context.Background(), getTestEventOfData(topic, map[string]interface{}{
"val": i,
}), getEmptyPublishCallback())

Expand All @@ -196,14 +216,14 @@ func TestConsumer_UpdateOffset(t *testing.T) {
}
}

func getTestEvent() *ce.Event {
func getTestEvent(topicName string) *ce.Event {
event := ce.NewEvent()
event.SetID(uuid.New().String())
event.SetSubject(topicName)
return &event
}

func getTestEventOfData(data map[string]interface{}) *ce.Event {
func getTestEventOfData(topicName string, data map[string]interface{}) *ce.Event {
event := ce.NewEvent()
event.SetID(uuid.New().String())
event.SetSubject(topicName)
Expand Down

0 comments on commit 206a5b0

Please sign in to comment.