Skip to content

Commit

Permalink
use new cassandra API (erda-project#1561)
Browse files Browse the repository at this point in the history
* cassandra use new API

* adjust interval

* ci fix

* ci fix

* ci fix
  • Loading branch information
erenming authored Aug 26, 2021
1 parent cb2bc76 commit 376de08
Show file tree
Hide file tree
Showing 16 changed files with 51 additions and 45 deletions.
2 changes: 2 additions & 0 deletions conf/monitor/streaming/streaming.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ log-store:
replication:
class: ${CASSANDRA_KEYSPACE_REPLICATION_CLASS:SimpleStrategy}
factor: ${CASSANDRA_KEYSPACE_REPLICATION_FACTOR:2}
reconnection:
check_interval: ${LOG_STORE_CASSANDRA_RECONNECTION_CHECK_INTERVAL:3m}
default_ttl: "${LOG_TTL:168h}"
gc_grace_seconds: 86400

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/dsnet/compress v0.0.1 // indirect
github.com/elastic/cloud-on-k8s v0.0.0-20210205172912-5ce0eca90c60
github.com/elazarl/goproxy v0.0.0-20200421181703-e76ad31c14f6
github.com/erda-project/erda-infra v0.0.0-20210817063509-a477b158393d
github.com/erda-project/erda-infra v0.0.0-20210825095053-85c5451901ed
github.com/erda-project/erda-proto-go v0.0.0-20210823110307-c397defb820e
github.com/extrame/ole2 v0.0.0-20160812065207-d69429661ad7 // indirect
github.com/extrame/xls v0.0.1
Expand Down Expand Up @@ -101,7 +101,7 @@ require (
github.com/rancher/remotedialer v0.2.6-0.20210318171128-d1ebd5202be4
github.com/rancher/steve v0.0.0-20210520191028-52f86dce9bd4
github.com/rancher/wrangler v0.8.1-0.20210423003607-f71a90542852
github.com/recallsong/go-utils v1.1.2-0.20210630062503-8880bcf66750
github.com/recallsong/go-utils v1.1.2-0.20210825061755-9df21f10489e
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/robfig/cron v1.2.0
github.com/russross/blackfriday/v2 v2.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/erda-project/elastic v0.0.1-ex h1:5ajfxQ5S5YjpzFqY9LzL9hiKWCn6q/JDT4n8sNv7+pU=
github.com/erda-project/elastic v0.0.1-ex/go.mod h1:iAVsas6fcmt9pxtge1+dErMhecv+RLSXlD4rnZRJVW0=
github.com/erda-project/erda-infra v0.0.0-20210706133120-0a742437972c/go.mod h1:TUQYSZ60w9dk7m0q3U3AVg7U74APj/sdEVvRWR3wYv8=
github.com/erda-project/erda-infra v0.0.0-20210817063509-a477b158393d h1:1y55yg46RQa0mVg4Rn41gKo/89e1XMzU9jMUs71fTOo=
github.com/erda-project/erda-infra v0.0.0-20210817063509-a477b158393d/go.mod h1:YpMIFoipL7XardYV3C58qx9fm74JM5QK/E3kd/t1CJU=
github.com/erda-project/erda-infra v0.0.0-20210825095053-85c5451901ed h1:JTCRL09cq/nQVX/448o2U05hOvH7nsLoYuu2OLWS168=
github.com/erda-project/erda-infra v0.0.0-20210825095053-85c5451901ed/go.mod h1:zj3Pqi6Ekn0fBHtFJV9cMMSxbQXX46IT8NQsncJn6zQ=
github.com/erda-project/erda-proto-go v0.0.0-20210823110307-c397defb820e h1:iRg9u84sDRggQyhMo3mwy2OqFh4AioPHd7OivgccBLE=
github.com/erda-project/erda-proto-go v0.0.0-20210823110307-c397defb820e/go.mod h1:rSETXX3nKxxIhgrVn7fKDM3mla1nNlWcPz4AkepixaU=
github.com/erda-project/influxql v1.1.0-ex h1:NgP5+S5Qo234IVSIJ3N/egvzCNYJURfMAett3e8a9LE=
Expand Down Expand Up @@ -1485,8 +1485,8 @@ github.com/rancher/wrangler v0.8.1-0.20210423003607-f71a90542852 h1:HMvBxqM0edSR
github.com/rancher/wrangler v0.8.1-0.20210423003607-f71a90542852/go.mod h1:zSV5oh3+YZboilwcJmFHO3J6FZba82BTQft1b6ijx2I=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/recallsong/go-utils v1.1.1/go.mod h1:NARLokMUOzyJ55oYXvPcxQh3rBoTBAbolNfqou6AqdY=
github.com/recallsong/go-utils v1.1.2-0.20210630062503-8880bcf66750 h1:0EDNOOStJuUO4tEpHl4XdSV/tcYWaPSkeW1boEP6c+0=
github.com/recallsong/go-utils v1.1.2-0.20210630062503-8880bcf66750/go.mod h1:NARLokMUOzyJ55oYXvPcxQh3rBoTBAbolNfqou6AqdY=
github.com/recallsong/go-utils v1.1.2-0.20210825061755-9df21f10489e h1:hoCgZvzt/h3XCxc+j9Q69ngG4RSf9hPQUlQLX+iUWqc=
github.com/recallsong/go-utils v1.1.2-0.20210825061755-9df21f10489e/go.mod h1:NARLokMUOzyJ55oYXvPcxQh3rBoTBAbolNfqou6AqdY=
github.com/recallsong/unmarshal v1.0.0 h1:y18hJihfxJ6vIkXAYROKPh9i3xKmZHH9wTiRvshY9bQ=
github.com/recallsong/unmarshal v1.0.0/go.mod h1:NnPiL4s7z9XV1aBS5LJaXo3V7r1l/ZH3wWOzCkKqyWo=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
Expand Down
4 changes: 2 additions & 2 deletions modules/core/monitor/alert/alert-apis/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func (p *provider) Init(ctx servicehub.Context) error {
}
}
cassandra := ctx.Service("cassandra").(cassandra.Interface)
session, err := cassandra.Session(&p.C.Cassandra.SessionConfig)
session, err := cassandra.NewSession(&p.C.Cassandra.SessionConfig)
if err != nil {
return fmt.Errorf("fail to create cassandra session: %s", err)
}
p.cql = cql.New(session)
p.cql = cql.New(session.Session())
if err := p.cql.Init(p.L, p.C.Cassandra.GCGraceSeconds); err != nil {
return fmt.Errorf("fail to init cassandra: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion modules/core/monitor/log/query/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type provider struct {
}

func (p *provider) Init(ctx servicehub.Context) error {
session, err := p.Cassandra.Session(&p.Cfg.Cassandra)
session, err := p.Cassandra.NewSession(&p.Cfg.Cassandra)
if err != nil {
return fmt.Errorf("fail to create cassandra session: %s", err)
}
Expand Down
6 changes: 3 additions & 3 deletions modules/core/monitor/log/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"strconv"
"time"

"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scylladb/gocqlx"
"github.com/scylladb/gocqlx/qb"

"github.com/erda-project/erda-infra/providers/cassandra"
"github.com/erda-project/erda-proto-go/core/monitor/log/query/pb"
"github.com/erda-project/erda/modules/core/monitor/log/schema"
)
Expand All @@ -40,12 +40,12 @@ type CQLQueryInf interface {
}

type cassandraQuery struct {
session *gocql.Session
session *cassandra.Session
}

func (c *cassandraQuery) Query(builder *qb.SelectBuilder, binding qb.M, dest interface{}) error {
stmt, names := builder.ToCql()
cql := gocqlx.Query(c.session.Query(stmt), names).BindMap(binding)
cql := gocqlx.Query(c.session.Session().Query(stmt), names).BindMap(binding)
return cql.SelectRelease(dest)
}

Expand Down
8 changes: 4 additions & 4 deletions modules/core/monitor/log/schema/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type LogSchema interface {
type CassandraSchema struct {
Logger logs.Logger
cass cassandra.Interface
defaultSession *gocql.Session
defaultSession *cassandra.Session
lastOrgList []string
mutexKey string
}
Expand All @@ -63,7 +63,7 @@ func WithMutexKey(key string) Option {
func NewCassandraSchema(cass cassandra.Interface, l logs.Logger, ops ...Option) (*CassandraSchema, error) {
cs := &CassandraSchema{}
cs.cass = cass
sysSession, err := cs.cass.Session(&cassandra.SessionConfig{Keyspace: *defaultKeyspaceConfig("system"), Consistency: "LOCAL_ONE"})
sysSession, err := cs.cass.NewSession(&cassandra.SessionConfig{Keyspace: *defaultKeyspaceConfig("system"), Consistency: "LOCAL_ONE"})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (cs *CassandraSchema) compareOrUpdate() error {
}

func (cs *CassandraSchema) existedCheck(keyspace string) (keyspaceExisted bool, tableExisted bool) {
m, err := cs.defaultSession.KeyspaceMetadata(keyspace)
m, err := cs.defaultSession.Session().KeyspaceMetadata(keyspace)
// keyspace existed check
if err != nil {
return false, false
Expand Down Expand Up @@ -216,7 +216,7 @@ func (cs *CassandraSchema) CreateDefault() error {
}

func (cs *CassandraSchema) createTable(stmt string) error {
q := cs.defaultSession.Query(stmt).Consistency(gocql.All).RetryPolicy(nil)
q := cs.defaultSession.Session().Query(stmt).Consistency(gocql.All).RetryPolicy(nil)
err := q.Exec()
q.Release()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion modules/core/monitor/log/storage/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type provider struct {

func (p *provider) Init(ctx servicehub.Context) error {
cass := ctx.Service("cassandra").(cassandra.Interface)
session, err := cass.Session(&p.Cfg.Output.Cassandra.SessionConfig)
session, err := cass.NewSession(&p.Cfg.Output.Cassandra.SessionConfig)
if err != nil {
return fmt.Errorf("fail to create cassandra session, err=%s", err)
}
Expand Down
22 changes: 16 additions & 6 deletions modules/core/monitor/log/storage/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
"github.com/jinzhu/gorm"
"github.com/stretchr/testify/assert"

writer "github.com/erda-project/erda-infra/pkg/parallel-writer"

"github.com/erda-project/erda-infra/base/logs"
"github.com/erda-project/erda-infra/base/logs/logrusx"
"github.com/erda-project/erda-infra/base/servicehub"
writer "github.com/erda-project/erda-infra/pkg/parallel-writer"
"github.com/erda-project/erda-infra/providers/cassandra"
"github.com/erda-project/erda-infra/providers/kafka"
)
Expand All @@ -42,11 +43,16 @@ func Test_provider_Init(t *testing.T) {
}

type mockContext struct {
context.Context
l logs.Logger
}

func (m *mockContext) Label() string {
return ""
}

func (m *mockContext) AddTask(task func(context.Context) error, options ...servicehub.TaskOption) {
panic("implement me")
return
}

func (m *mockContext) Hub() *servicehub.Hub {
Expand Down Expand Up @@ -120,6 +126,14 @@ func (m *mockMysql) DB() *gorm.DB {
type mockCassandraInf struct {
}

func (m *mockCassandraInf) NewSession(cfg *cassandra.SessionConfig) (*cassandra.Session, error) {
return nil, nil
}

func (m *mockCassandraInf) NewBatchWriter(session *cassandra.Session, c *cassandra.WriterConfig, builderCreator func() cassandra.StatementBuilder) writer.Writer {
return nil
}

func (m *mockCassandraInf) CreateKeyspaces(ksc ...*cassandra.KeyspaceConfig) error {
return nil
}
Expand All @@ -128,10 +142,6 @@ func (m *mockCassandraInf) Session(cfg *cassandra.SessionConfig) (*gocql.Session
return nil, nil
}

func (m *mockCassandraInf) NewBatchWriter(session *gocql.Session, c *cassandra.WriterConfig, builderCreator func() cassandra.StatementBuilder) writer.Writer {
return &mockWriter{}
}

func mockProvider() *provider {
p := &provider{}
p.Cfg = &config{
Expand Down
5 changes: 2 additions & 3 deletions modules/monitor/apm/topology/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package topology
import (
"fmt"

"github.com/gocql/gocql"
"github.com/olivere/elastic"

"github.com/erda-project/erda-infra/base/logs"
Expand All @@ -40,7 +39,7 @@ type provider struct {
ctx servicehub.Context
metricq metricq.Queryer
t i18n.Translator
cassandraSession *gocql.Session
cassandraSession *cassandra.Session
}

type define struct{}
Expand Down Expand Up @@ -79,7 +78,7 @@ func (topology *provider) Init(ctx servicehub.Context) (err error) {
topology.metricq = ctx.Service("metrics-query").(metricq.Queryer)

c := ctx.Service("cassandra").(cassandra.Interface)
session, err := c.Session(&topology.Cfg.Cassandra)
session, err := c.NewSession(&topology.Cfg.Cassandra)
topology.cassandraSession = session
if err != nil {
return fmt.Errorf("fail to create cassandra session: %s", err)
Expand Down
8 changes: 4 additions & 4 deletions modules/msp/apm/exception/exception.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type exceptionService struct {

func (s *exceptionService) GetExceptions(ctx context.Context, req *pb.GetExceptionsRequest) (*pb.GetExceptionsResponse, error) {

iter := s.p.cassandraSession.Query("SELECT * FROM error_description_v2 where terminus_key=? ALLOW FILTERING", req.ScopeID).Iter()
iter := s.p.cassandraSession.Session().Query("SELECT * FROM error_description_v2 where terminus_key=? ALLOW FILTERING", req.ScopeID).Iter()

var exceptions []*pb.Exception
for {
Expand All @@ -54,7 +54,7 @@ func (s *exceptionService) GetExceptions(ctx context.Context, req *pb.GetExcepti
layout := "2006-01-02 15:04:05"

stat := "SELECT timestamp,count FROM error_count WHERE error_id= ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp ASC"
iterCount := s.p.cassandraSession.Query(stat, exception.Id, req.StartTime*1e6, req.EndTime*1e6).Iter()
iterCount := s.p.cassandraSession.Session().Query(stat, exception.Id, req.StartTime*1e6, req.EndTime*1e6).Iter()
count := int64(0)
index := 0
for {
Expand All @@ -81,7 +81,7 @@ func (s *exceptionService) GetExceptions(ctx context.Context, req *pb.GetExcepti
}

func (s *exceptionService) GetExceptionEventIds(ctx context.Context, req *pb.GetExceptionEventIdsRequest) (*pb.GetExceptionEventIdsResponse, error) {
iter := s.p.cassandraSession.Query("SELECT event_id FROM error_event_mapping WHERE error_id= ? limit ?", req.ExceptionID, 999).Iter()
iter := s.p.cassandraSession.Session().Query("SELECT event_id FROM error_event_mapping WHERE error_id= ? limit ?", req.ExceptionID, 999).Iter()

var data []string
for {
Expand All @@ -95,7 +95,7 @@ func (s *exceptionService) GetExceptionEventIds(ctx context.Context, req *pb.Get
}

func (s *exceptionService) GetExceptionEvent(ctx context.Context, req *pb.GetExceptionEventRequest) (*pb.GetExceptionEventResponse, error) {
iter := s.p.cassandraSession.Query("SELECT * FROM error_events WHERE event_id = ?", req.ExceptionEventID).Iter()
iter := s.p.cassandraSession.Session().Query("SELECT * FROM error_events WHERE event_id = ?", req.ExceptionEventID).Iter()
event := pb.ExceptionEvent{}
for {
row := make(map[string]interface{})
Expand Down
6 changes: 2 additions & 4 deletions modules/msp/apm/exception/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package exception
import (
"fmt"

"github.com/gocql/gocql"

logs "github.com/erda-project/erda-infra/base/logs"
servicehub "github.com/erda-project/erda-infra/base/servicehub"
transport "github.com/erda-project/erda-infra/pkg/transport"
Expand All @@ -38,11 +36,11 @@ type provider struct {
Register transport.Register
Cassandra cassandra.Interface `autowired:"cassandra"`
exceptionService *exceptionService
cassandraSession *gocql.Session
cassandraSession *cassandra.Session
}

func (p *provider) Init(ctx servicehub.Context) error {
session, err := p.Cassandra.Session(&p.Cfg.Cassandra)
session, err := p.Cassandra.NewSession(&p.Cfg.Cassandra)
if err != nil {
return fmt.Errorf("fail to create cassandra session: %s", err)
}
Expand Down
5 changes: 2 additions & 3 deletions modules/msp/apm/trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package trace
import (
"fmt"

"github.com/gocql/gocql"
"github.com/jinzhu/gorm"

"github.com/erda-project/erda-infra/base/logs"
Expand Down Expand Up @@ -45,13 +44,13 @@ type provider struct {
Metric metricpb.MetricServiceServer `autowired:"erda.core.monitor.metric.MetricService"`
DB *gorm.DB `autowired:"mysql-client"`
Cassandra cassandra.Interface `autowired:"cassandra"`
cassandraSession *gocql.Session
cassandraSession *cassandra.Session
}

func (p *provider) Init(ctx servicehub.Context) error {
// translator

session, err := p.Cassandra.Session(&p.Cfg.Cassandra)
session, err := p.Cassandra.NewSession(&p.Cfg.Cassandra)
if err != nil {
return fmt.Errorf("fail to create cassandra session: %s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions modules/msp/apm/trace/storage/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
metrics "github.com/erda-project/erda/modules/core/monitor/metric"
)

func (p *provider) initCassandra(session *gocql.Session) error {
func (p *provider) initCassandra(session *cassandra.Session) error {
for _, stmt := range []string{
fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS spans (
Expand All @@ -49,7 +49,7 @@ func (p *provider) initCassandra(session *gocql.Session) error {
`, p.Cfg.Output.Cassandra.GCGraceSeconds),
fmt.Sprintf("ALTER TABLE spans WITH gc_grace_seconds = %d;", p.Cfg.Output.Cassandra.GCGraceSeconds),
} {
q := session.Query(stmt).Consistency(gocql.All).RetryPolicy(nil)
q := session.Session().Query(stmt).Consistency(gocql.All).RetryPolicy(nil)
err := q.Exec()
q.Release()
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions modules/msp/apm/trace/storage/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"fmt"
"time"

"github.com/gocql/gocql"

"github.com/erda-project/erda-infra/base/logs"
"github.com/erda-project/erda-infra/base/servicehub"
writer "github.com/erda-project/erda-infra/pkg/parallel-writer"
Expand Down Expand Up @@ -49,13 +47,13 @@ type provider struct {
cassandra writer.Writer
kafka writer.Writer
}
cassandraSession *gocql.Session
cassandraSession *cassandra.Session
}

func (p *provider) Init(ctx servicehub.Context) error {
p.ttlSec = int(p.Cfg.Output.Cassandra.TTL.Seconds())
cassandra := ctx.Service("cassandra").(cassandra.Interface)
session, err := cassandra.Session(&p.Cfg.Output.Cassandra.SessionConfig)
session, err := cassandra.NewSession(&p.Cfg.Output.Cassandra.SessionConfig)
p.cassandraSession = session
if err != nil {
return fmt.Errorf("fail to create cassandra session: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions modules/msp/apm/trace/trace.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *traceService) GetSpans(ctx context.Context, req *pb.GetSpansRequest) (*
if req.Limit <= 0 || req.Limit > 1000 {
req.Limit = 1000
}
iter := s.p.cassandraSession.Query("SELECT * FROM spans WHERE trace_id = ? limit ?", req.TraceID, req.Limit).Iter()
iter := s.p.cassandraSession.Session().Query("SELECT * FROM spans WHERE trace_id = ? limit ?", req.TraceID, req.Limit).Iter()
spanTree := make(query.SpanTree)
for {
row := make(map[string]interface{})
Expand Down Expand Up @@ -154,7 +154,7 @@ func calculateDepth(depth int64, span *pb.Span, spanTree query.SpanTree) int64 {

func (s *traceService) GetSpanCount(ctx context.Context, traceID string) (int64, error) {
count := 0
s.p.cassandraSession.Query("SELECT COUNT(trace_id) FROM spans WHERE trace_id = ?", traceID).Iter().Scan(&count)
s.p.cassandraSession.Session().Query("SELECT COUNT(trace_id) FROM spans WHERE trace_id = ?", traceID).Iter().Scan(&count)
return int64(count), nil
}

Expand Down

0 comments on commit 376de08

Please sign in to comment.