Skip to content

Commit

Permalink
fix: fix reference
Browse files Browse the repository at this point in the history
  • Loading branch information
huskar-t committed Jan 12, 2023
1 parent f46683e commit dc50d4f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 55 deletions.
32 changes: 19 additions & 13 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ v2 与 v3 版本不兼容,与 TDengine 版本对应如下:
| v3.0.0 | 3.0.0.0+ |
| v3.0.1 | 3.0.0.0+ |
| v3.0.3 | 3.0.1.5+ |
| v3.0.4 | 3.0.2.2+ |
| v3.1.0 | 3.0.2.2+ |

## 安装

Expand Down Expand Up @@ -128,31 +130,31 @@ func main() {
创建消费:

```go
func NewConsumer(conf *Config) (*Consumer, error)
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
```

订阅
订阅单个主题

```go
func (c *Consumer) Subscribe(topics []string) error
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
```

轮询消息
订阅

```go
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
```

提交消息
轮询消息

```go
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) Poll(timeoutMs int) tmq.Event
```

释放消息
提交消息

```go
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
```

取消订阅:
Expand Down Expand Up @@ -511,19 +513,23 @@ DSN 格式为:

### 订阅相关 API

- `func NewConsumer(config *Config) (*Consumer, error)`
- `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`

创建消费者。

- `func (c *Consumer) Subscribe(topic []string) error`
- `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`

订阅单个主题。

- `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`

订阅主题。

- `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
- `func (c *Consumer) Poll(timeoutMs int) tmq.Event`

轮询消息。

- `func (c *Consumer) Commit(messageID uint64) error`
- `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`

提交消息。

Expand Down
32 changes: 19 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ v2 is not compatible with v3 version and corresponds to the TDengine version as
| v3.0.0 | 3.0.0.0+ |
| v3.0.1 | 3.0.0.0+ |
| v3.0.3 | 3.0.1.5+ |
| v3.0.4 | 3.0.2.2+ |
| v3.1.0 | 3.0.2.2+ |

## Install

Expand Down Expand Up @@ -125,31 +127,31 @@ APIs that are worthy to have a check:
Create consumer:

````go
func NewConsumer(conf *Config) (*Consumer, error)
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
````

Subscribe:
Subscribe single topic:

````go
func (c *Consumer) Subscribe(topics []string) error
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
````

Poll message:
Subscribe topics:

````go
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
````

Commit message:
Poll message:

````go
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) Poll(timeoutMs int) tmq.Event
````

Free message:
Commit message:

````go
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
````

Unsubscribe:
Expand Down Expand Up @@ -512,19 +514,23 @@ Use tmq over websocket. The server needs to start taoAdapter.

### Subscription related API

- `func NewConsumer(config *Config) (*Consumer, error)`
- `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`

Create a consumer.

- `func (c *Consumer) Subscribe(topic []string) error`
- `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`

Subscribe a topic.

- `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`

Subscribe to topics.

- `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
- `func (c *Consumer) Poll(timeoutMs int) tmq.Event`

Poll messages.

- `func (c *Consumer) Commit(messageID uint64) error`
- `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`

Commit message.

Expand Down
50 changes: 25 additions & 25 deletions examples/tmqoverws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func main() {
panic(err)
}
defer db.Close()
//prepareEnv(db)
prepareEnv(db)
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": "ws://127.0.0.1:6041/rest/tmq",
"ws.message.channelLen": uint(0),
Expand All @@ -35,30 +35,30 @@ func main() {
if err != nil {
panic(err)
}
//go func() {
// _, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
// "c1 bool," +
// "c2 tinyint," +
// "c3 smallint," +
// "c4 int," +
// "c5 bigint," +
// "c6 tinyint unsigned," +
// "c7 smallint unsigned," +
// "c8 int unsigned," +
// "c9 bigint unsigned," +
// "c10 float," +
// "c11 double," +
// "c12 binary(20)," +
// "c13 nchar(20)" +
// ")")
// if err != nil {
// panic(err)
// }
// _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
// if err != nil {
// panic(err)
// }
//}()
go func() {
_, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")")
if err != nil {
panic(err)
}
_, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
if err != nil {
panic(err)
}
}()
for i := 0; i < 5; i++ {
ev := consumer.Poll(0)
if ev != nil {
Expand Down
9 changes: 5 additions & 4 deletions wrapper/tmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/parser"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
"github.com/taosdata/driver-go/v3/errors"
"github.com/taosdata/driver-go/v3/wrapper/cgo"
)
Expand Down Expand Up @@ -1091,7 +1092,7 @@ func TestTMQModify(t *testing.T) {
}
TaosFreeResult(result)

pool := func(cb func(*tmq.Meta, unsafe.Pointer)) {
pool := func(cb func(*tmqcommon.Meta, unsafe.Pointer)) {
message := TMQConsumerPoll(tmq, 500)
assert.NotNil(t, message)
topic := TMQGetTopicName(message)
Expand All @@ -1101,7 +1102,7 @@ func TestTMQModify(t *testing.T) {
pointer := TMQGetJsonMeta(message)
assert.NotNil(t, pointer)
data := ParseJsonMeta(pointer)
var meta tmq.Meta
var meta tmqcommon.Meta
err = jsoniter.Unmarshal(data, &meta)
assert.NoError(t, err)

Expand Down Expand Up @@ -1132,7 +1133,7 @@ func TestTMQModify(t *testing.T) {
TMQFreeRaw(rawMeta)
}

pool(func(meta *tmq.Meta, rawMeta unsafe.Pointer) {
pool(func(meta *tmqcommon.Meta, rawMeta unsafe.Pointer) {
assert.Equal(t, "create", meta.Type)
assert.Equal(t, "stb", meta.TableName)
assert.Equal(t, "super", meta.TableType)
Expand Down Expand Up @@ -1322,7 +1323,7 @@ func TestTMQAutoCreateTable(t *testing.T) {
pointer := TMQGetJsonMeta(message)
data := ParseJsonMeta(pointer)
t.Log(string(data))
var meta tmq.Meta
var meta tmqcommon.Meta
err = jsoniter.Unmarshal(data, &meta)
assert.NoError(t, err)
assert.Equal(t, "create", meta.Type)
Expand Down

0 comments on commit dc50d4f

Please sign in to comment.