Skip to content

Commit

Permalink
enh: use fetch_raw to reduce the number of calls to get results
Browse files Browse the repository at this point in the history
  • Loading branch information
huskar-t committed Jan 25, 2024
1 parent f0323f7 commit 9a1db34
Show file tree
Hide file tree
Showing 9 changed files with 1,244 additions and 99 deletions.
39 changes: 18 additions & 21 deletions af/tmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

type Consumer struct {
cConsumer unsafe.Pointer
cConsumer unsafe.Pointer
dataParser *parser.TMQRawDataParser
}

// NewConsumer Create new TMQ consumer with TMQ config
Expand All @@ -28,7 +29,8 @@ func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) {
return nil, err
}
consumer := &Consumer{
cConsumer: cConsumer,
cConsumer: cConsumer,
dataParser: parser.NewTMQRawDataParser(),
}
return consumer, nil
}
Expand Down Expand Up @@ -176,27 +178,22 @@ func (c *Consumer) getMeta(message unsafe.Pointer) (*tmq.Meta, error) {
}

func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) {
errCode, raw := wrapper.TMQGetRaw(message)
if errCode != taosError.SUCCESS {
errStr := wrapper.TaosErrorStr(message)
err := taosError.NewError(int(errCode), errStr)
return nil, err
}
_, _, rawPtr := wrapper.ParseRawMeta(raw)
blockInfos, err := c.dataParser.Parse(rawPtr)
if err != nil {
return nil, err
}
var tmqData []*tmq.Data
for {
blockSize, errCode, block := wrapper.TaosFetchRawBlock(message)
if errCode != int(taosError.SUCCESS) {
errStr := wrapper.TaosErrorStr(message)
err := taosError.NewError(errCode, errStr)
return nil, err
}
if blockSize == 0 {
break
}
tableName := wrapper.TMQGetTableName(message)
fileCount := wrapper.TaosNumFields(message)
rh, err := wrapper.ReadColumn(message, fileCount)
if err != nil {
return nil, err
}
precision := wrapper.TaosResultPrecision(message)
for i := 0; i < len(blockInfos); i++ {
tmqData = append(tmqData, &tmq.Data{
TableName: tableName,
Data: parser.ReadBlock(block, blockSize, rh.ColTypes, precision),
TableName: blockInfos[i].TableName,
Data: parser.ReadBlockSimple(blockInfos[i].RawBlock, blockInfos[i].Precision),
})
}
return tmqData, nil
Expand Down
101 changes: 100 additions & 1 deletion af/tmq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestSeek(t *testing.T) {
}

defer func() {
//execWithoutResult(conn, "drop database if exists "+db)
execWithoutResult(conn, "drop database if exists "+db)
}()
for _, sql := range sqls {
err = execWithoutResult(conn, sql)
Expand Down Expand Up @@ -308,3 +308,102 @@ func execWithoutResult(conn unsafe.Pointer, sql string) error {
}
return nil
}

func prepareMultiBlockEnv(conn unsafe.Pointer) error {
var err error
steps := []string{
"drop topic if exists test_tmq_multi_block_topic",
"drop database if exists test_tmq_multi_block",
"create database test_tmq_multi_block vgroups 1 WAL_RETENTION_PERIOD 86400",
"create topic test_tmq_multi_block_topic as database test_tmq_multi_block",
"create table test_tmq_multi_block.t1(ts timestamp,v int)",
"create table test_tmq_multi_block.t2(ts timestamp,v int)",
"create table test_tmq_multi_block.t3(ts timestamp,v int)",
"create table test_tmq_multi_block.t4(ts timestamp,v int)",
"create table test_tmq_multi_block.t5(ts timestamp,v int)",
"create table test_tmq_multi_block.t6(ts timestamp,v int)",
"create table test_tmq_multi_block.t7(ts timestamp,v int)",
"create table test_tmq_multi_block.t8(ts timestamp,v int)",
"create table test_tmq_multi_block.t9(ts timestamp,v int)",
"create table test_tmq_multi_block.t10(ts timestamp,v int)",
"insert into test_tmq_multi_block.t1 values (now,1) test_tmq_multi_block.t2 values (now,2) " +
"test_tmq_multi_block.t3 values (now,3) test_tmq_multi_block.t4 values (now,4)" +
"test_tmq_multi_block.t5 values (now,5) test_tmq_multi_block.t6 values (now,6)" +
"test_tmq_multi_block.t7 values (now,7) test_tmq_multi_block.t8 values (now,8)" +
"test_tmq_multi_block.t9 values (now,9) test_tmq_multi_block.t10 values (now,10)",
}
for _, step := range steps {
err = execWithoutResult(conn, step)
if err != nil {
return err
}
}
return nil
}

func cleanMultiBlockEnv(conn unsafe.Pointer) error {
var err error
time.Sleep(2 * time.Second)
steps := []string{
"drop topic if exists test_tmq_multi_block_topic",
"drop database if exists test_tmq_multi_block",
}
for _, step := range steps {
err = execWithoutResult(conn, step)
if err != nil {
return err
}
}
return nil
}

func TestMultiBlock(t *testing.T) {
conn, err := wrapper.TaosConnect("", "root", "taosdata", "", 0)
if err != nil {
t.Error(err)
return
}
defer wrapper.TaosClose(conn)
err = prepareMultiBlockEnv(conn)
assert.NoError(t, err)
defer cleanMultiBlockEnv(conn)
consumer, err := NewConsumer(&tmq.ConfigMap{
"group.id": "test",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"auto.offset.reset": "earliest",
"client.id": "test_tmq_multi_block_topic",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
assert.NoError(t, err)
if err != nil {
t.Error(err)
return
}
defer func() {
consumer.Unsubscribe()
consumer.Close()
}()
topic := []string{"test_tmq_multi_block_topic"}
err = consumer.SubscribeTopics(topic, nil)
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
event := consumer.Poll(500)
if event == nil {
continue
}
switch e := event.(type) {
case *tmq.DataMessage:
data := e.Value().([]*tmq.Data)
assert.Equal(t, "test_tmq_multi_block", e.DBName())
assert.Equal(t, 10, len(data))
return
}
}
}
12 changes: 12 additions & 0 deletions common/parser/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,18 @@ func rawConvertJson(pHeader, pStart unsafe.Pointer, row int) driver.Value {
return binaryVal[:]
}

func ReadBlockSimple(block unsafe.Pointer, precision int) [][]driver.Value {
blockSize := RawBlockGetNumOfRows(block)
colCount := RawBlockGetNumOfCols(block)
colInfo := make([]RawBlockColInfo, colCount)
RawBlockGetColInfo(block, colInfo)
colTypes := make([]uint8, colCount)
for i := int32(0); i < colCount; i++ {
colTypes[i] = uint8(colInfo[i].ColType)
}
return ReadBlock(block, int(blockSize), colTypes, precision)
}

// ReadBlock in-place
func ReadBlock(block unsafe.Pointer, blockSize int, colTypes []uint8, precision int) [][]driver.Value {
r := make([][]driver.Value, blockSize)
Expand Down
10 changes: 2 additions & 8 deletions common/parser/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,12 +663,6 @@ func TestParseBlock(t *testing.T) {
t.Error(errors.NewError(code, errStr))
return
}
fileCount := wrapper.TaosNumFields(res)
rh, err := wrapper.ReadColumn(res, fileCount)
if err != nil {
t.Error(err)
return
}
precision := wrapper.TaosResultPrecision(res)
var data [][]driver.Value
for {
Expand All @@ -684,7 +678,7 @@ func TestParseBlock(t *testing.T) {
break
}
version := RawBlockGetVersion(block)
assert.Equal(t, int32(1), version)
t.Log(version)
length := RawBlockGetLength(block)
assert.Equal(t, int32(447), length)
rows := RawBlockGetNumOfRows(block)
Expand Down Expand Up @@ -771,7 +765,7 @@ func TestParseBlock(t *testing.T) {
},
infos,
)
d := ReadBlock(block, blockSize, rh.ColTypes, precision)
d := ReadBlockSimple(block, precision)
data = append(data, d...)
}
wrapper.TaosFreeResult(res)
Expand Down
Loading

0 comments on commit 9a1db34

Please sign in to comment.