Skip to content

Commit

Permalink
feat: implement of clickhouse creator,initializer,loader,writer,stora…
Browse files Browse the repository at this point in the history
…ge (erda-project#4431)

* feat: implement of clickhouse creator,initializer,loader

* feat: implement of clickhouse writer, storage

* feature: adjust clickhouse log configuration

* feature: add clickhouse config sections

* merge master
  • Loading branch information
snakorse authored Mar 29, 2022
1 parent e806a5e commit 6ec3438
Show file tree
Hide file tree
Showing 23 changed files with 1,076 additions and 24 deletions.
4 changes: 4 additions & 0 deletions cmd/monitor/streaming/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ import (
_ "github.com/erda-project/erda/modules/core/monitor/event/storage/elasticsearch"
_ "github.com/erda-project/erda/modules/core/monitor/log/persist"
_ "github.com/erda-project/erda/modules/core/monitor/log/persist/v1"
_ "github.com/erda-project/erda/modules/core/monitor/log/storage/clickhouse"
_ "github.com/erda-project/erda/modules/core/monitor/log/storage/elasticsearch"
_ "github.com/erda-project/erda/modules/core/monitor/metric/persist"
_ "github.com/erda-project/erda/modules/core/monitor/metric/storage/elasticsearch"
_ "github.com/erda-project/erda/modules/core/monitor/settings/retention-strategy"
_ "github.com/erda-project/erda/modules/core/monitor/storekit/clickhouse/table/creator"
_ "github.com/erda-project/erda/modules/core/monitor/storekit/clickhouse/table/initializer"
_ "github.com/erda-project/erda/modules/core/monitor/storekit/clickhouse/table/loader"
_ "github.com/erda-project/erda/modules/core/monitor/storekit/elasticsearch/index/creator"
_ "github.com/erda-project/erda/modules/core/monitor/storekit/elasticsearch/index/initializer"
_ "github.com/erda-project/erda/modules/core/monitor/storekit/elasticsearch/index/loader"
Expand Down
2 changes: 2 additions & 0 deletions conf/monitor/streaming/clickhouse/logs_ddl_create_db.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// 创建数据库
CREATE DATABASE IF NOT EXISTS <database> ON CLUSTER '{cluster}';
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// 创建日志表
CREATE TABLE IF NOT EXISTS <database>.logs ON CLUSTER '{cluster}'
(
`_id` String,
`timestamp` DateTime64(9,'Asia/Shanghai'),
`source` String,
`id` String,
`org_name` String,
`stream` String,
`offset` Int64,
`content` String,
`tags` Map(String,String),
INDEX idx__id(_id) TYPE minmax GRANULARITY 1
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}-{shard}/logs', '{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (org_name, timestamp, id)
TTL toDateTime(timestamp) + INTERVAL 7 DAY;

// 将常用字段添加为物化列
ALTER TABLE <database>.logs ON CLUSTER '{cluster}'
ADD COLUMN IF NOT EXISTS `tags.trace_id` String MATERIALIZED tags['trace_id'],
ADD COLUMN IF NOT EXISTS `tags.level` String MATERIALIZED tags['level'],
ADD COLUMN IF NOT EXISTS `tags.application_name` String MATERIALIZED tags['application_name'],
ADD COLUMN IF NOT EXISTS `tags.service_name` String MATERIALIZED tags['service_name'],
ADD COLUMN IF NOT EXISTS `tags.pod_name` String MATERIALIZED tags['pod_name'],
ADD COLUMN IF NOT EXISTS `tags.pod_ip` String MATERIALIZED tags['pod_ip'],
ADD COLUMN IF NOT EXISTS `tags.container_name` String MATERIALIZED tags['container_name'],
ADD COLUMN IF NOT EXISTS `tags.container_id` String MATERIALIZED tags['container_id'];

// 对常用字段添加索引
ALTER TABLE <database>.logs ON CLUSTER '{cluster}' ADD INDEX IF NOT EXISTS idx_tace_id(tags.trace_id) TYPE bloom_filter GRANULARITY 1;

// 创建分布式表
// 注意: 如果对logs表结构新增列, 需要同步修改logs_all
CREATE TABLE IF NOT EXISTS <database>.logs_all ON CLUSTER '{cluster}'
AS <database>.logs
ENGINE = Distributed('{cluster}', <database>, logs, rand());
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// 创建日志表
CREATE TABLE IF NOT EXISTS <database>.<table_name> ON CLUSTER '{cluster}'
(
`_id` String,
`timestamp` DateTime64(9,'Asia/Shanghai'),
`source` String,
`id` String,
`org_name` String,
`stream` String,
`offset` Int64,
`content` String,
`tags` Map(String,String),
INDEX idx__id(_id) TYPE minmax GRANULARITY 1
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}-{shard}/<table_name>', '{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (org_name, timestamp, id)
TTL toDateTime(timestamp) + INTERVAL 7 DAY;

// 将常用字段添加为物化列
ALTER TABLE <database>.<table_name> ON CLUSTER '{cluster}'
ADD COLUMN IF NOT EXISTS `tags.trace_id` String MATERIALIZED tags['trace_id'],
ADD COLUMN IF NOT EXISTS `tags.level` String MATERIALIZED tags['level'],
ADD COLUMN IF NOT EXISTS `tags.application_name` String MATERIALIZED tags['application_name'],
ADD COLUMN IF NOT EXISTS `tags.service_name` String MATERIALIZED tags['service_name'],
ADD COLUMN IF NOT EXISTS `tags.pod_name` String MATERIALIZED tags['pod_name'],
ADD COLUMN IF NOT EXISTS `tags.pod_ip` String MATERIALIZED tags['pod_ip'],
ADD COLUMN IF NOT EXISTS `tags.container_name` String MATERIALIZED tags['container_name'],
ADD COLUMN IF NOT EXISTS `tags.container_id` String MATERIALIZED tags['container_id'];

// 对常用字段添加索引
ALTER TABLE <database>.<table_name> ON CLUSTER '{cluster}' ADD INDEX IF NOT EXISTS idx_tace_id(tags.trace_id) TYPE bloom_filter GRANULARITY 1;

// 创建分布式表
// 注意: 如果对logs表结构新增列, 需要同步修改logs_all
CREATE TABLE IF NOT EXISTS <database>.<table_name>_all ON CLUSTER '{cluster}'
AS <database>.logs
ENGINE = Distributed('{cluster}', <database>, <table_name>, rand());

// 创建Merge查询表
CREATE TABLE IF NOT EXISTS <database>.<alias_table_name>_search ON CLUSTER '{cluster}'
AS <database>.logs
ENGINE = Merge(<database>, 'logs_all|<alias_table_name>.*_all$');
41 changes: 40 additions & 1 deletion conf/monitor/streaming/streaming.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ kafka.topic.initializer:
- erda-spans
- erda-trace-metrics

redis:
addr: "${REDIS_ADDR}"
password: "${REDIS_PASSWORD}"
db: ${REDIS_DB:0}
master_name: "${REDIS_MASTER_NAME}"
sentinels_addr: "${REDIS_SENTINELS_ADDR}"

mysql:
host: "${MYSQL_HOST:localhost}"
port: ${MYSQL_PORT:3306}
Expand All @@ -45,9 +52,40 @@ etcd:
etcd-election@index:
root_path: "/erda/monitor-index-rollover-election"

etcd-election@table:
root_path: "/erda/monitor-ck-table-loader-election"

etcd-mutex:
root_path: "/erda/streaming"

clickhouse:
_enable: ${CLICKHOUSE_ENABLE:false}
addr: "${CLICKHOUSE_ADDR:localhost:9000}"
username: "${CLICKHOUSE_USERNAME:default}"
password: "${CLICKHOUSE_PASSWORD:default}"

clickhouse.table.initializer:
_enable: ${CLICKHOUSE_ENABLE:false}
ddl_files:
- path: "${CONFIG_PATH}/clickhouse/logs_ddl_create_db.sql"
ignore_err: "false"
- path: "${CONFIG_PATH}/clickhouse/logs_ddl_create_default_tables.sql"
ignore_err: "false"

clickhouse.table.creator@log:
_enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:false}
ddl_template: "${CONFIG_PATH}/clickhouse/logs_ddl_create_tenant_tables.sql"
default_write_table: "logs"
table_prefix: "logs"

clickhouse.table.loader@log:
_enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:false}
table_prefix: "logs"
default_search_table: "logs_all"

log-storage-clickhouse:
_enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:false}

# elasticsearch for log
elasticsearch@log:
_enable: ${LOG_ELASTICSEARCH_ENABLE:false}
Expand Down Expand Up @@ -102,7 +140,7 @@ log-storage-elasticsearch:
index_type: "logs"

log-persist:
_enable: ${WRITE_LOG_TO_ES_ENABLE:true}
_enable: ${WRITE_LOG_TO_ES_ENABLE|WRITE_LOG_TO_CLICKHOUSE_ENABLE:true}
input:
topics: "${LOG_TOPICS:spot-container-log,spot-job-log}"
group: "${LOG_GROUP_ID:erda-logs-dev}"
Expand All @@ -113,6 +151,7 @@ log-persist:
read_timeout: "5s"
buffer_size: ${LOG_BATCH_SIZE:200}
parallelism: ${LOG_PERSIST_PARALLELISM:3}
storage_writer_service: "${LOG_STORAGE_WRITER_SERVICE:log-storage-elasticsearch-writer}"
print_invalid_log: false

cassandra:
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
bou.ke/monkey v1.0.2
cloud.google.com/go/iam v0.1.1 // indirect
github.com/360EntSecGroup-Skylar/excelize/v2 v2.3.2
github.com/ClickHouse/clickhouse-go/v2 v2.0.12 // indirect
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20201215015655-2e8b733f5ad0
github.com/Masterminds/semver v1.5.0
Expand Down Expand Up @@ -44,7 +45,7 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/elastic/cloud-on-k8s v0.0.0-20210205172912-5ce0eca90c60
github.com/elazarl/goproxy v0.0.0-20200421181703-e76ad31c14f6
github.com/erda-project/erda-infra v0.0.0-20220323032941-d5175e7d4ef0
github.com/erda-project/erda-infra v0.0.0-20220325020513-886f7946e771
github.com/erda-project/erda-oap-thirdparty-protocol v0.0.0-20210907135609-15886a136d5b
github.com/erda-project/erda-proto-go v0.0.0
github.com/erda-project/erda-sourcecov v0.1.0
Expand Down
6 changes: 5 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,12 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/erda-project/elastic v0.0.1-ex h1:5ajfxQ5S5YjpzFqY9LzL9hiKWCn6q/JDT4n8sNv7+pU=
github.com/erda-project/elastic v0.0.1-ex/go.mod h1:iAVsas6fcmt9pxtge1+dErMhecv+RLSXlD4rnZRJVW0=
github.com/erda-project/erda-infra v0.0.0-20220323032941-d5175e7d4ef0 h1:c76i036bsVTT/FlLHhu6LafCAlfcGUFBuaFITW0i3V4=
github.com/erda-project/erda-infra v0.0.0-20220314054853-f0b3a8f7a48f/go.mod h1:3MGvEI3uNp1tB5Kl4VnTd1ZJs8aHgWitdN1qz3MMMkQ=
github.com/erda-project/erda-infra v0.0.0-20220321022514-a6f951c8a9ae h1:faLURgsfCxePZNiTpnHtRLemPSmBbn2xHoG+mAG1CyI=
github.com/erda-project/erda-infra v0.0.0-20220321022514-a6f951c8a9ae/go.mod h1:bqtrGKJcCeoJu42bsOoBnlwzfIsZGq0KRl4KJBSGMrg=
github.com/erda-project/erda-infra v0.0.0-20220323032941-d5175e7d4ef0/go.mod h1:9MGw5o3niKhqAz2OeX7P7urT/B7eFQCY3VenY3DA3E8=
github.com/erda-project/erda-infra v0.0.0-20220325020513-886f7946e771 h1:nimevt+vqBzWkzo8DjnROJxxEQdDBElKtOo9MontjoA=
github.com/erda-project/erda-infra v0.0.0-20220325020513-886f7946e771/go.mod h1:9MGw5o3niKhqAz2OeX7P7urT/B7eFQCY3VenY3DA3E8=
github.com/erda-project/erda-oap-thirdparty-protocol v0.0.0-20210907135609-15886a136d5b h1:GWf2ChasZFerFwQoTokIvjJLWH57ligTSLD2hUb7UWk=
github.com/erda-project/erda-oap-thirdparty-protocol v0.0.0-20210907135609-15886a136d5b/go.mod h1:H/f81Thef2Tnz4nUeLt0r4VwHdOznthpyXBwT9vDWo0=
github.com/erda-project/erda-sourcecov v0.1.0 h1:iLvoMsQ1xX81KNOW98BKr85Vs7sSazrIDEphLYdmgP4=
Expand Down
16 changes: 9 additions & 7 deletions modules/core/monitor/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import (

// Log .
type Log struct {
Source string `json:"source"`
ID string `json:"id"`
Stream string `json:"stream"`
Content string `json:"content"`
Offset int64 `json:"offset"`
UniqId string `json:"-" ch:"_id"`
OrgName string `json:"-" ch:"org_name"`
Source string `json:"source" ch:"source"`
ID string `json:"id" ch:"id"`
Stream string `json:"stream" ch:"stream"`
Content string `json:"content" ch:"content"`
Offset int64 `json:"offset" ch:"offset"`
Time *time.Time `json:"time,omitempty"` // the time key in fluent-bit is RFC3339Nano
Timestamp int64 `json:"timestamp"`
Tags map[string]string `json:"tags"`
Timestamp int64 `json:"timestamp" ch:"timestamp"`
Tags map[string]string `json:"tags" ch:"tags"`
}

// LabeledLog .
Expand Down
29 changes: 16 additions & 13 deletions modules/core/monitor/log/persist/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ import (

type (
config struct {
Input kafka.BatchReaderConfig `file:"input"`
Parallelism int `file:"parallelism" default:"1"`
BufferSize int `file:"buffer_size" default:"1024"`
ReadTimeout time.Duration `file:"read_timeout" default:"5s"`
IDKeys []string `file:"id_keys"`
PrintInvalidLog bool `file:"print_invalid_log" default:"false"`
Input kafka.BatchReaderConfig `file:"input"`
Parallelism int `file:"parallelism" default:"1"`
BufferSize int `file:"buffer_size" default:"1024"`
ReadTimeout time.Duration `file:"read_timeout" default:"5s"`
IDKeys []string `file:"id_keys"`
PrintInvalidLog bool `file:"print_invalid_log" default:"false"`
StorageWriterService string `file:"storage_writer_service" default:"log-storage-elasticsearch-writer"`
}
provider struct {
Cfg *config
Log logs.Logger
Kafka kafka.Interface `autowired:"kafka"`
StorageWriter storage.Storage `autowired:"log-storage-writer"`
Cfg *config
Log logs.Logger
Kafka kafka.Interface `autowired:"kafka"`

storage storage.Storage
stats Statistics
Expand All @@ -60,6 +60,8 @@ func (p *provider) Init(ctx servicehub.Context) (err error) {
ctx.AddTask(runner.Run, servicehub.WithTaskName("log metadata processor"))
}

p.storage = ctx.Service(p.Cfg.StorageWriterService).(storage.Storage)

p.stats = sharedStatistics

// add consumer task
Expand All @@ -71,7 +73,7 @@ func (p *provider) Init(ctx servicehub.Context) (err error) {
}
defer r.Close()

w, err := p.StorageWriter.NewWriter(ctx)
w, err := p.storage.NewWriter(ctx)
if err != nil {
return err
}
Expand All @@ -91,8 +93,9 @@ func (p *provider) Init(ctx servicehub.Context) (err error) {

func init() {
servicehub.Register("log-persist", &servicehub.Spec{
ConfigFunc: func() interface{} { return &config{} },
Dependencies: []string{"kafka.topic.initializer"},
ConfigFunc: func() interface{} { return &config{} },
Dependencies: []string{"kafka.topic.initializer"},
OptionalDependencies: []string{"log-storage-clickhouse", "log-storage-elasticsearch"},
Creator: func() servicehub.Provider {
return &provider{}
},
Expand Down
26 changes: 26 additions & 0 deletions modules/core/monitor/log/storage/clickhouse/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clickhouse

import (
"context"

"github.com/erda-project/erda/modules/core/monitor/log/storage"
"github.com/erda-project/erda/modules/core/monitor/storekit"
)

func (p *provider) Iterator(ctx context.Context, sel *storage.Selector) (storekit.Iterator, error) {
panic("implement me")
}
61 changes: 61 additions & 0 deletions modules/core/monitor/log/storage/clickhouse/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clickhouse

import (
"fmt"

"github.com/erda-project/erda-infra/base/logs"
"github.com/erda-project/erda-infra/base/servicehub"
"github.com/erda-project/erda-infra/providers/clickhouse"
"github.com/erda-project/erda/modules/core/monitor/log/storage"
"github.com/erda-project/erda/modules/core/monitor/settings/retention-strategy"
"github.com/erda-project/erda/modules/core/monitor/storekit/clickhouse/table/creator"
)

type config struct {
}

type provider struct {
Cfg *config
Log logs.Logger
Creator creator.Interface `autowired:"clickhouse.table.creator@log"`
Retention retention.Interface `autowired:"storage-retention-strategy@log" optional:"true"`

clickhouse clickhouse.Interface
}

var _ storage.Storage = (*provider)(nil)

func (p *provider) Init(ctx servicehub.Context) error {
svc := ctx.Service("clickhouse@log")
if svc == nil {
svc = ctx.Service("clickhouse")
}
if svc == nil {
return fmt.Errorf("service clickhouse is required")
}
p.clickhouse = svc.(clickhouse.Interface)
return nil
}

func init() {
servicehub.Register("log-storage-clickhouse", &servicehub.Spec{
Services: []string{"log-storage-clickhouse-reader", "log-storage-clickhouse-writer"},
Dependencies: []string{"clickhouse", "clickhouse.table.creator"},
ConfigFunc: func() interface{} { return &config{} },
Creator: func() servicehub.Provider { return &provider{} },
})
}
Loading

0 comments on commit 6ec3438

Please sign in to comment.