Skip to content

Commit

Permalink
Added replica label to metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
valyala committed Nov 7, 2017
1 parent 73c4eb8 commit b954ecd
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 56 deletions.
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -571,28 +571,28 @@ Metrics are exposed in [prometheus text format](https://prometheus.io/docs/instr

| Name | Type | Description | Labels |
| ------------- | ------------- | ------------- | ------------- |
| status_codes_total | Counter | Distribution by response status codes | `user`, `cluster`, `cluster_user`, `cluster_node`, `code` |
| request_sum_total | Counter | The number of processed requests | `user`, `cluster`, `cluster_user`, `cluster_node` |
| request_success_total | Counter | The number of successfully proxied requests | `user`, `cluster`, `cluster_user`, `cluster_node` |
| concurrent_limit_excess_total | Counter | The number of rejected requests due to max_concurrent_queries limit | `user`, `cluster`, `cluster_user`, `cluster_node` |
| host_penalties_total | Counter | The number of given penalties by host | `cluster`, `cluster_node` |
| host_health | Gauge | Health state of hosts by clusters | `cluster`, `cluster_node` |
| concurrent_queries | Gauge | The number of concurrent queries at the moment | `user`, `cluster`, `cluster_user`, `cluster_node` |
| status_codes_total | Counter | Distribution by response status codes | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node`, `code` |
| request_sum_total | Counter | The number of processed requests | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |
| request_success_total | Counter | The number of successfully proxied requests | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |
| concurrent_limit_excess_total | Counter | The number of rejected requests due to max_concurrent_queries limit | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |
| host_penalties_total | Counter | The number of given penalties by host | `cluster`, `replica`, `cluster_node` |
| host_health | Gauge | Health state of hosts by clusters | `cluster`, `replica`, `cluster_node` |
| concurrent_queries | Gauge | The number of concurrent queries at the moment | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |
| request_queue_size | Gauge | Request queue size at the moment | `user`, `cluster`, `cluster_user` |
| user_queue_overflow_total | Counter | The number of overflows for per-user request queues | `user`, `cluster`, `cluster_user` |
| cluster_user_queue_overflow_total | Counter | The number of overflows for per-cluster_user request queues | `user`, `cluster`, `cluster_user` |
| request_body_bytes_total | Counter | The amount of bytes read from request bodies | `user`, `cluster`, `cluster_user`, `cluster_node` |
| response_body_bytes_total | Counter | The amount of bytes written to response bodies | `user`, `cluster`, `cluster_user`, `cluster_node` |
| request_body_bytes_total | Counter | The amount of bytes read from request bodies | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |
| response_body_bytes_total | Counter | The amount of bytes written to response bodies | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |
| cache_hits_total | Counter | The amount of cache hits | `cache`, `user`, `cluster`, `cluster_user` |
| cache_miss_total | Counter | The amount of cache misses | `cache`, `user`, `cluster`, `cluster_user` |
| cache_size | Gauge | Size of each cache | `cache` |
| cache_items | Gauge | The number of items in each cache | `cache` |
| request_duration_seconds | Summary | Request duration. Includes possible queue wait time | `user`, `cluster`, `cluster_user`, `cluster_node` |
| proxied_response_duration_seconds | Summary | Duration for responses proxied from clickhouse | `user`, `cluster`, `cluster_user`, `cluster_node` |
| request_duration_seconds | Summary | Request duration. Includes possible queue wait time | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |
| proxied_response_duration_seconds | Summary | Duration for responses proxied from clickhouse | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |
| cached_response_duration_seconds | Summary | Duration for cached responses | `cache`, `user`, `cluster`, `cluster_user` |
| bad_requests_total | Counter | The number of unsupported requests | |
| canceled_request_total | Counter | The number of requests canceled by remote client | `user`, `cluster`, `cluster_user`, `cluster_node` |
| timeout_request_total | Counter | The number of timed out requests | `user`, `cluster`, `cluster_user`, `cluster_node` |
| canceled_request_total | Counter | The number of requests canceled by remote client | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |
| timeout_request_total | Counter | The number of timed out requests | `user`, `cluster`, `cluster_user`, `replica`, `cluster_node` |

An example of [Grafana's](https://grafana.com) dashboard for `chproxy` metrics is available [here](https://github.com/Vertamedia/chproxy/blob/master/chproxy_overview.json)

Expand Down
26 changes: 13 additions & 13 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,49 @@ var (
Name: "status_codes_total",
Help: "Distribution by status codes",
},
[]string{"user", "cluster", "cluster_user", "cluster_node", "code"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node", "code"},
)
requestSum = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "request_sum_total",
Help: "Total number of sent requests",
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
requestSuccess = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "request_success_total",
Help: "Total number of sent success requests",
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
limitExcess = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "concurrent_limit_excess_total",
Help: "Total number of max_concurrent_queries excess",
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
hostPenalties = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "host_penalties_total",
Help: "Total number of given penalties by host",
},
[]string{"cluster", "cluster_node"},
[]string{"cluster", "replica", "cluster_node"},
)
hostHealth = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "host_health",
Help: "Health state of hosts by clusters",
},
[]string{"cluster", "cluster_node"},
[]string{"cluster", "replica", "cluster_node"},
)
concurrentQueries = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "concurrent_queries",
Help: "The number of concurrent queries at current time",
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
requestQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -78,14 +78,14 @@ var (
Name: "request_body_bytes_total",
Help: "The amount of bytes read from request bodies",
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
responseBodyBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "response_body_bytes_total",
Help: "The amount of bytes written to response bodies",
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
badRequest = prometheus.NewCounter(prometheus.CounterOpts{
Name: "bad_requests_total",
Expand Down Expand Up @@ -125,15 +125,15 @@ var (
Help: "Request duration. Includes possible wait time in the queue",
Objectives: map[float64]float64{0.5: 1e-1, 0.9: 1e-2, 0.99: 1e-3, 0.999: 1e-4, 1: 1e-5},
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
proxiedResponseDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "proxied_response_duration_seconds",
Help: "Response duration proxied from clickhouse",
Objectives: map[float64]float64{0.5: 1e-1, 0.9: 1e-2, 0.99: 1e-3, 0.999: 1e-4, 1: 1e-5},
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
cachedResponseDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Expand All @@ -148,14 +148,14 @@ var (
Name: "canceled_request_total",
Help: "The number of requests canceled by remote client",
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
timeoutRequest = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "timeout_request_total",
Help: "The number of timed out requests",
},
[]string{"user", "cluster", "cluster_user", "cluster_node"},
[]string{"user", "cluster", "cluster_user", "replica", "cluster_node"},
)
)

Expand Down
8 changes: 5 additions & 3 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}

// WARNING: don't use s.labels before s.incQueued,
// since s.labels["cluster_node"] may change inside incQueued.
// since `replica` and `cluster_node` may change inside incQueued.
if err := s.incQueued(); err != nil {
limitExcess.With(s.labels).Inc()
q := getQuerySnippet(req)
Expand Down Expand Up @@ -125,6 +125,7 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
"user": s.user.name,
"cluster": s.cluster.name,
"cluster_user": s.clusterUser.name,
"replica": s.host.replica.name,
"cluster_node": s.host.addr.Host,
"code": strconv.Itoa(srw.statusCode),
},
Expand Down Expand Up @@ -232,8 +233,8 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
return
}

// Do not store `cluster_node` in lables, since it has no sense
// for cache metrics.
// Do not store `replica` and `cluster_node` in lables, since they have
// no sense for cache metrics.
labels := prometheus.Labels{
"cache": s.user.cache.Name,
"user": s.labels["user"],
Expand Down Expand Up @@ -461,6 +462,7 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) {
"user": u.name,
"cluster": c.name,
"cluster_user": cu.name,
"replica": h.replica.name,
"cluster_node": h.addr.Host,
},
}
Expand Down
52 changes: 32 additions & 20 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (s *scope) incQueued() error {
return s.inc()
}

// Do not store `cluster_node` in lables, since it has no sense
// for queue metrics.
// Do not store `replica` and `cluster_node` in lables, since they have
// no sense for queue metrics.
labels := prometheus.Labels{
"user": s.labels["user"],
"cluster": s.labels["cluster"],
Expand Down Expand Up @@ -140,6 +140,7 @@ func (s *scope) incQueued() error {
// after sleeping.
h := s.cluster.getHost()
s.host = h
s.labels["replica"] = h.replica.name
s.labels["cluster_node"] = h.addr.Host
}
}
Expand Down Expand Up @@ -437,7 +438,7 @@ func newClusterUser(cu config.ClusterUser) *clusterUser {
}

type host struct {
cluster *cluster
replica *replica

// Counter of unsuccessful requests to decrease host priority.
penalty uint32
Expand All @@ -452,46 +453,55 @@ type host struct {
}

type replica struct {
cluster *cluster

name string

hosts []*host
nextHostIdx uint32
}

func newReplicas(replicasCfg []config.Replica, nodes []string, scheme string, c *cluster) ([]*replica, error) {
if len(nodes) > 0 {
// No replicas, just flat nodes. Create fake replica
// with all the nodes.
hosts, err := newNodes(nodes, scheme, c)
// No replicas, just flat nodes. Create default replica
// containing all the nodes.
r := &replica{
cluster: c,
name: "default",
}
hosts, err := newNodes(nodes, scheme, r)
if err != nil {
return nil, err
}
r := &replica{
hosts: hosts,
}
r.hosts = hosts
return []*replica{r}, nil
}

replicas := make([]*replica, len(replicasCfg))
for i, r := range replicasCfg {
hosts, err := newNodes(r.Nodes, scheme, c)
if err != nil {
return nil, fmt.Errorf("cannot initialize replica %q: %s", r.Name, err)
for i, rCfg := range replicasCfg {
r := &replica{
cluster: c,
name: rCfg.Name,
}
replicas[i] = &replica{
hosts: hosts,
hosts, err := newNodes(rCfg.Nodes, scheme, r)
if err != nil {
return nil, fmt.Errorf("cannot initialize replica %q: %s", rCfg.Name, err)
}
r.hosts = hosts
replicas[i] = r
}
return replicas, nil
}

func newNodes(nodes []string, scheme string, c *cluster) ([]*host, error) {
func newNodes(nodes []string, scheme string, r *replica) ([]*host, error) {
hosts := make([]*host, len(nodes))
for i, node := range nodes {
addr, err := url.Parse(fmt.Sprintf("%s://%s", scheme, node))
if err != nil {
return nil, fmt.Errorf("cannot parse `node` %q with `scheme` %q: %s", node, scheme, err)
}
hosts[i] = &host{
cluster: c,
replica: r,
addr: addr,
}
}
Expand All @@ -500,7 +510,8 @@ func newNodes(nodes []string, scheme string, c *cluster) ([]*host, error) {

func (h *host) runHeartbeat(done <-chan struct{}) {
label := prometheus.Labels{
"cluster": h.cluster.name,
"cluster": h.replica.cluster.name,
"replica": h.replica.name,
"cluster_node": h.addr.Host,
}
heartbeat := func() {
Expand All @@ -514,7 +525,7 @@ func (h *host) runHeartbeat(done <-chan struct{}) {
}
}
heartbeat()
interval := h.cluster.heartBeatInterval
interval := h.replica.cluster.heartBeatInterval
for {
select {
case <-done:
Expand Down Expand Up @@ -553,7 +564,8 @@ func (h *host) penalize() {
return
}
hostPenalties.With(prometheus.Labels{
"cluster": h.cluster.name,
"cluster": h.replica.cluster.name,
"replica": h.replica.name,
"cluster_node": h.addr.Host,
}).Inc()
atomic.AddUint32(&h.penalty, penaltySize)
Expand Down
21 changes: 14 additions & 7 deletions scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestRunningQueries(t *testing.T) {
"user": "default",
"cluster": "default",
"cluster_user": "default",
"replica": "default",
"cluster_node": "default",
}

Expand Down Expand Up @@ -88,6 +89,7 @@ func TestRunningQueries(t *testing.T) {
"user": "default",
"cluster": "default",
"cluster_user": "default",
"replica": "default",
"cluster_node": "default",
}
if err := s.inc(); err != nil {
Expand All @@ -109,21 +111,23 @@ func TestGetHost(t *testing.T) {
name: "default",
replicas: []*replica{&replica{}},
}
c.replicas[0].hosts = []*host{
r := c.replicas[0]
r.cluster = c
r.hosts = []*host{
{
addr: &url.URL{Host: "127.0.0.1"},
active: 1,
cluster: c,
replica: r,
},
{
addr: &url.URL{Host: "127.0.0.2"},
active: 1,
cluster: c,
replica: r,
},
{
addr: &url.URL{Host: "127.0.0.3"},
active: 1,
cluster: c,
replica: r,
},
}

Expand Down Expand Up @@ -168,8 +172,6 @@ func TestGetHost(t *testing.T) {
}
h.inc()

r := c.replicas[0]

// inc last host to get least-loaded 1st host
r.hosts[2].inc()

Expand Down Expand Up @@ -211,8 +213,13 @@ func TestGetHost(t *testing.T) {

func TestPenalize(t *testing.T) {
c := &cluster{name: "default"}
c.replicas = []*replica{
{
cluster: c,
},
}
h := &host{
cluster: c,
replica: c.replicas[0],
addr: &url.URL{Host: "127.0.0.1"},
}
exp := uint32(0)
Expand Down

0 comments on commit b954ecd

Please sign in to comment.