Skip to content

Commit

Permalink
feature: add [start,end) param for clude, endpointMetric, endpoints a…
Browse files Browse the repository at this point in the history
…pi (ccfos#639)
  • Loading branch information
yubo authored Mar 30, 2021
1 parent 92ac8b0 commit 1ff6d0a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 38 deletions.
88 changes: 60 additions & 28 deletions src/common/dataobj/query_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,36 @@ func (resp *TsdbQueryResponse) Key() string {
}

type EndpointsRecv struct {
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Start int64 `json:"start" description:"inclusive"`
End int64 `json:"end" description:"exclusive"`
StartInclusive time.Time `json:"-"`
EndExclusive time.Time `json:"-"`
}

func (p *EndpointsRecv) Validate() (err error) {
p.StartInclusive, p.EndExclusive, err = timeRangeValidate(p.Start, p.End)
return
}

type MetricResp struct {
Metrics []string `json:"metrics"`
}

type EndpointMetricRecv struct {
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Metrics []string `json:"metrics"`
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Metrics []string `json:"metrics"`
Start int64 `json:"start" description:"inclusive"`
End int64 `json:"end" description:"exclusive"`
StartInclusive time.Time `json:"-"`
EndExclusive time.Time `json:"-"`
}

func (p *EndpointMetricRecv) Validate() (err error) {
p.StartInclusive, p.EndExclusive, err = timeRangeValidate(p.Start, p.End)
return
}

type IndexTagkvResp struct {
Expand All @@ -85,11 +103,20 @@ type TagPair struct {
}

type CludeRecv struct {
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Metric string `json:"metric"`
Include []*TagPair `json:"include"`
Exclude []*TagPair `json:"exclude"`
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Metric string `json:"metric"`
Include []*TagPair `json:"include"`
Exclude []*TagPair `json:"exclude"`
Start int64 `json:"start" description:"inclusive"`
End int64 `json:"end" description:"exclusive"`
StartInclusive time.Time `json:"-"`
EndExclusive time.Time `json:"-"`
}

func (p *CludeRecv) Validate() (err error) {
p.StartInclusive, p.EndExclusive, err = timeRangeValidate(p.Start, p.End)
return
}

type XcludeResp struct {
Expand All @@ -112,24 +139,9 @@ type IndexByFullTagsRecv struct {
EndExclusive time.Time `json:"-"`
}

func (p *IndexByFullTagsRecv) Validate() error {
if p.End == 0 {
p.EndExclusive = time.Now()
} else {
p.EndExclusive = time.Unix(p.End, 0)
}

if p.Start == 0 {
p.StartInclusive = p.EndExclusive.Add(-time.Hour * 25)
} else {
p.StartInclusive = time.Unix(p.Start, 0)
}

if p.StartInclusive.After(p.EndExclusive) {
return fmt.Errorf("start is after end")
}

return nil
func (p *IndexByFullTagsRecv) Validate() (err error) {
p.StartInclusive, p.EndExclusive, err = timeRangeValidate(p.Start, p.End)
return
}

type IndexByFullTagsResp struct {
Expand All @@ -141,3 +153,23 @@ type IndexByFullTagsResp struct {
DsType string `json:"dstype"`
Count int `json:"count"`
}

func timeRangeValidate(start, end int64) (startInclusive, endExclusive time.Time, err error) {
if end == 0 {
endExclusive = time.Now()
} else {
endExclusive = time.Unix(end, 0)
}

if start == 0 {
startInclusive = endExclusive.Add(-time.Hour * 25)
} else {
startInclusive = time.Unix(start, 0)
}

if startInclusive.After(endExclusive) {
err = fmt.Errorf("start is after end")
}

return
}
18 changes: 14 additions & 4 deletions src/modules/transfer/backend/m3db/m3db.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ type Client struct {
namespaceID ident.ID
}

func indexStartTime() time.Time {
return time.Now().Add(-time.Hour * 25)
}

func NewClient(cfg M3dbSection) (*Client, error) {
client, err := cfg.Config.NewClient(client.ConfigurationParameters{})
if err != nil {
Expand Down Expand Up @@ -186,6 +182,11 @@ func (p *Client) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQue
// QueryMetrics: || (&& (endpoint)) (counter)...
// return all the values that tag == __name__
func (p *Client) QueryMetrics(input dataobj.EndpointsRecv) *dataobj.MetricResp {
if err := input.Validate(); err != nil {
logger.Errorf("input validate err %s", err)
return nil
}

session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
Expand All @@ -205,6 +206,11 @@ func (p *Client) QueryMetrics(input dataobj.EndpointsRecv) *dataobj.MetricResp {
// QueryTagPairs: && (|| endpoints...) (|| metrics...)
// return all the tags that matches
func (p *Client) QueryTagPairs(input dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp {
if err := input.Validate(); err != nil {
logger.Errorf("input validate err %s", err)
return nil
}

session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
Expand Down Expand Up @@ -232,6 +238,10 @@ func (p *Client) QueryIndexByClude(inputs []dataobj.CludeRecv) (ret []dataobj.Xc
}

for _, input := range inputs {
if err := input.Validate(); err != nil {
logger.Errorf("input validate err %s", err)
continue
}
ret = append(ret, p.queryIndexByClude(session, input)...)
}

Expand Down
12 changes: 6 additions & 6 deletions src/modules/transfer/backend/m3db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (cfg M3dbSection) queryMetricsOptions(input dataobj.EndpointsRecv) (index.Q
)},
index.AggregationOptions{
QueryOptions: index.QueryOptions{
StartInclusive: indexStartTime(),
EndExclusive: time.Now(),
StartInclusive: input.StartInclusive,
EndExclusive: input.EndExclusive,
SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit,
},
Expand All @@ -164,8 +164,8 @@ func (cfg M3dbSection) queryTagPairsOptions(input dataobj.EndpointMetricRecv) (i
return index.Query{idx.NewConjunctionQuery(q1, q2)},
index.AggregationOptions{
QueryOptions: index.QueryOptions{
StartInclusive: indexStartTime(),
EndExclusive: time.Now(),
StartInclusive: input.StartInclusive,
EndExclusive: input.EndExclusive,
SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit,
},
Expand Down Expand Up @@ -201,8 +201,8 @@ func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index.
}

return query, index.QueryOptions{
StartInclusive: indexStartTime(),
EndExclusive: time.Now(),
StartInclusive: input.StartInclusive,
EndExclusive: input.EndExclusive,
SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit,
}
Expand Down

0 comments on commit 1ff6d0a

Please sign in to comment.