Skip to content

Commit

Permalink
tlclick update
Browse files Browse the repository at this point in the history
  • Loading branch information
nikandfor committed Feb 4, 2025
1 parent d3de0f7 commit 9d3b1f3
Show file tree
Hide file tree
Showing 3 changed files with 336 additions and 5 deletions.
182 changes: 180 additions & 2 deletions ext/tlclick/click2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,203 @@ package tlclick

import (
"context"
_ "embed"
"io"
"sync"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/chpool"
"github.com/ClickHouse/ch-go/proto"
"tlog.app/go/errors"
"tlog.app/go/tlog"

"tlog.app/go/tlog/convert"
"tlog.app/go/tlog/tlwire"
)

//go:embed schema2.sql
var schema []byte

type (
Click struct {
pool *chpool.Pool

d tlwire.Decoder
j convert.JSON

mu sync.Mutex

json []byte
labels []byte

ls [][]byte // tlog labels

pair []byte
buf []byte

cols cols
}

cols struct {
tlog proto.ColStr
ls *proto.ColArr[[]byte]
ts proto.ColDateTime64Raw

json proto.ColStr
labels proto.ColStr

input proto.Input
query ch.Query

Check failure on line 51 in ext/tlclick/click2.go

View workflow job for this annotation

GitHub Actions / lint

undefined: ch (typecheck)
}
)

func New(pool *chpool.Pool) *Click {
return &Click{pool: pool}
d := &Click{pool: pool}

c := &d.cols

c.input = proto.Input{
{Name: "tlog", Data: &c.tlog},

Check failure on line 61 in ext/tlclick/click2.go

View workflow job for this annotation

GitHub Actions / lint

missing type in composite literal (typecheck)
{Name: "_labels", Data: proto.NewArray(&proto.ColStr{})},

Check failure on line 62 in ext/tlclick/click2.go

View workflow job for this annotation

GitHub Actions / lint

missing type in composite literal (typecheck)

Check failure on line 62 in ext/tlclick/click2.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, 1.20)

type *proto.ColStr of &proto.ColStr{} does not match proto.ColumnOf[T] (cannot infer T)

Check failure on line 62 in ext/tlclick/click2.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, 1.19)

type *proto.ColStr of &proto.ColStr{} does not match proto.ColumnOf[T] (cannot infer T)
{Name: "ts", Data: &c.ts},

Check failure on line 63 in ext/tlclick/click2.go

View workflow job for this annotation

GitHub Actions / lint

missing type in composite literal (typecheck)
{Name: "json", Data: &c.json},
{Name: "labels", Data: &c.labels},
}

c.query = ch.Query{

Check failure on line 68 in ext/tlclick/click2.go

View workflow job for this annotation

GitHub Actions / lint

undefined: ch (typecheck)
Body: c.input.Into("ingest"),
Input: c.input,
}

return d
}

func (d *Click) Write(p []byte) (int, error) {
defer d.mu.Unlock()
d.mu.Lock()

d.json = d.json[:0]
d.labels = d.labels[:0]
d.ls = d.ls[:0]
d.buf = d.buf[:0]

tag, els, i := d.d.Tag(p, 0)
if tag != tlwire.Map {
return 0, errors.New("expected map")
}

var k []byte
var ts int64

for el := 0; els == -1 || el < int(els); el++ {
if els == -1 && d.d.Break(p, &i) {
break
}

kst := i
k, i = d.d.Bytes(p, i)

vst := i
tag, sub, end := d.d.SkipTag(p, i)
i = end

d.pair, _ = d.j.ConvertKey(d.pair[:0], p, kst)
d.pair = append(d.pair, ':')
d.pair, _ = d.j.ConvertValue(d.pair, p, vst)

if tag == tlwire.Semantic && sub == tlog.WireLabel {
d.ls = append(d.ls, p[kst:end])

addComma(&d.labels)
d.labels = append(d.labels, d.pair...)

continue
}

addComma(&d.json)
d.json = append(d.json, d.pair...)

if tag == tlwire.Semantic && sub == tlwire.Time && string(k) == tlog.KeyTimestamp && ts == 0 {
ts, _ = d.d.Timestamp(p, vst)
}
}

addClose(&d.json)
addClose(&d.labels)

//

c := &d.cols

c.tlog.AppendBytes(p)
c.ls.Append(d.ls)
c.ts.Append(proto.DateTime64(ts))

c.json.AppendBytes(d.json)
c.labels.AppendBytes(d.labels)

return len(p), nil
}

func (d *Click) Query(ctx context.Context, w io.Writer, ts int64, q string) error { return nil }

func (d *Click) CreateTables(ctx context.Context) error { return nil }
func (d *Click) CreateTables(ctx context.Context) error {
query := 0
i := skipSpaces(schema, 0)

for i < len(schema) {
end := next(schema, i, ',')

q := ch.Query{

Check failure on line 152 in ext/tlclick/click2.go

View workflow job for this annotation

GitHub Actions / lint

undefined: ch (typecheck)
Body: string(schema[i:end]),
}

err := d.pool.Do(ctx, q)
if err != nil {
return errors.Wrap(err, "query %d (%d:%d)", query, i, end)
}

i = skipSpaces(schema, end+1)
query++
}

return nil
}

func addComma(b *[]byte) {
if len(*b) == 0 {
*b = append(*b, '{')
} else {
*b = append(*b, ',')
}
}

func addClose(b *[]byte) {
if len(*b) != 0 {
*b = append(*b, '}')
}
}

func skipSpaces(b []byte, i int) int {
for i < len(b) && (b[i] == ' ' || b[i] == '\n') {
i++
}

return i
}

func next(b []byte, i int, c byte) int {
for i < len(b) && b[i] != c {
i++
}

return i
}

func csel[T any](c bool, t, e T) T {
if c {
return t
} else {
return e
}
}
6 changes: 3 additions & 3 deletions ext/tlclick/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"crypto/tls"

click "github.com/ClickHouse/ch-go"
ch "github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/chpool"
)

Expand All @@ -14,9 +14,9 @@ func NewPool(ctx context.Context, opts chpool.Options) (*chpool.Pool, error) {

func DefaultPoolOptions(addr string) chpool.Options {
return chpool.Options{
ClientOptions: click.Options{
ClientOptions: ch.Options{
Address: addr,
Compression: click.CompressionZSTD,
Compression: ch.CompressionZSTD,
ClientName: "tlog agent",

TLS: &tls.Config{},
Expand Down
153 changes: 153 additions & 0 deletions ext/tlclick/schema2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@

CREATE TABLE IF NOT EXISTS ingest (
tlog String COMMENT 'raw event',

_labels Array(String) COMMENT 'tlog labels',

ts DateTime64(9, 'UTC') DEFAULT addNanoseconds(toDateTime64(0, 9, 'UTC'), JSONExtractInt(json, '_t')),

json String COMMENT 'without labels',
labels String COMMENT 'json labels',
)
ENGINE Null
;

CREATE TABLE IF NOT EXISTS events (
tlog String COMMENT 'raw event',

_labels Array(String) COMMENT 'tlog labels',
_labels_hash UInt64 MATERIALIZED cityHash64(_labels),

ts DateTime64(9, 'UTC') DEFAULT addNanoseconds(toDateTime64(0, 9, 'UTC'), JSONExtractInt(json, '_t')),

json String COMMENT 'without labels',
labels JSON(),

_s UUID DEFAULT toUUIDOrZero(JSONExtractString(json, '_s')) COMMENT 'span',
_p UUID DEFAULT toUUIDOrZero(JSONExtractString(json, '_p')) COMMENT 'parent',

_k FixedString(1) DEFAULT substring(JSONExtractString(json, '_k')), 1, 2) COMMENT 'kind',
_c String DEFAULT JSONExtractString(json, '_c')) COMMENT 'caller',
_e Int64 DEFAULT JSONExtractInt(json, '_e')) COMMENT 'elapsed',
_l Int8 DEFAULT JSONExtractInt(json, '_l')) COMMENT 'log level',

_m String DEFAULT JSONExtractInt(json, '_m')) COMMENT 'message',

kvs JSON(SKIP _s, SKIP _p, SKIP _k, SKIP _c, SKIP _e, SKIP _l, SKIP _m) COMMENT 'json without _* keys',

minute DateTime ALIAS toStartOfMinute(ts),
hour DateTime ALIAS toStartOfHour(ts),
day Date ALIAS toStartOfDay(ts),
week Date MATERIALIZED toStartOfWeek(ts),
month Date ALIAS toStartOfMonth(ts),

db_insert_time DateTime DEFAULT now(),
)
ENGINE MergeTree
ORDER BY (_labels_hash, ts)
PARTITION BY week
;

CREATE MATERIALIZED VIEW IF NOT EXISTS events_mv
TO event
AS SELECT
tlog,
_labels,
ts,
json,
labels,
json AS kvs,
0
FROM ingest
;

CREATE TABLE IF NOT EXISTS spans (
_s UUID,
ts DateTime64(9, 'UTC'),

_labels_hash UInt64,

minute DateTime ALIAS toStartOfMinute(ts),
hour DateTime ALIAS toStartOfHour(ts),
day Date ALIAS toStartOfDay(ts),
week Date MATERIALIZED toStartOfWeek(ts),
month Date ALIAS toStartOfMonth(ts),

db_insert_time DateTime DEFAULT now(),
)
ENGINE ReplacingMergeTree()
ORDER BY (_s, _labels_hash, ts)
PARTITION BY week
;

CREATE MATERIALIZED VIEW IF NOT EXISTS spans_mv
TO spans
AS SELECT
_s,
ts,
_labels_hash,
0
FROM events
-- WHERE notEmpty(_s)
;

CREATE TABLE IF NOT EXISTS parent (
_s UUID,
_p UUID,
ts DateTime64(9, 'UTC'),

_labels_hash UInt64,

minute DateTime ALIAS toStartOfMinute(ts),
hour DateTime ALIAS toStartOfHour(ts),
day Date ALIAS toStartOfDay(ts),
week Date MATERIALIZED toStartOfWeek(ts),
month Date ALIAS toStartOfMonth(ts),

db_insert_time DateTime DEFAULT now(),
)
ENGINE ReplacingMergeTree()
ORDER BY (_s, _p, _labels_hash, ts)
PARTITION BY week
;

CREATE MATERIALIZED VIEW IF NOT EXISTS parent_mv
TO parent
AS SELECT
_s,
_p,
ts,
_labels_hash,
0
FROM events
WHERE notEmpty(_p) AND notEmpty(_s)
;

CREATE TABLE IF NOT EXISTS labels (
labels JSON(),
_labels_hash UInt64,

ts DateTime64(9, 'UTC'),

minute DateTime ALIAS toStartOfMinute(ts),
hour DateTime ALIAS toStartOfHour(ts),
day Date ALIAS toStartOfDay(ts),
week Date MATERIALIZED toStartOfWeek(ts),
month Date ALIAS toStartOfMonth(ts),

db_insert_time DateTime DEFAULT now(),
)
ENGINE ReplacingMergeTree()
ORDER BY (_labels_hash)
PARTITION BY week
;

CREATE MATERIALIZED VIEW IF NOT EXISTS labels_mv
TO labels
AS SELECT
labels,
_labels_hash,
ts,
0
FROM events
;

0 comments on commit 9d3b1f3

Please sign in to comment.