Skip to content

Commit

Permalink
Merge pull request #490 from miaolz123/master
Browse files Browse the repository at this point in the history
feat: more extra data for register
  • Loading branch information
smallnest authored Sep 8, 2020
2 parents ad59edf + 2d0336b commit 46d874e
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 36 deletions.
22 changes: 16 additions & 6 deletions serverplugin/consul.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package serverplugin

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -82,10 +83,10 @@ func (p *ConsulRegisterPlugin) Start() error {
close(p.done)
return
case <-ticker.C:
var data []byte
extra := make(map[string]string)
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
data = []byte(fmt.Sprintf("newconnected=%.2f", clientMeter.RateMean()))
extra["calls"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("calls", p.Metrics).RateMean())
extra["connections"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("connections", p.Metrics).RateMean())
}

//set this same metrics for all services at this server
Expand All @@ -105,7 +106,9 @@ func (p *ConsulRegisterPlugin) Start() error {
}
} else {
v, _ := url.ParseQuery(string(kvPaire.Value))
v.Set("tps", string(data))
for key, value := range extra {
v.Set(key, value)
}
p.kv.Put(nodePath, []byte(v.Encode()), &store.WriteOptions{TTL: p.UpdateInterval * 2})
}
}
Expand Down Expand Up @@ -153,12 +156,19 @@ func (p *ConsulRegisterPlugin) Stop() error {
// HandleConnAccept handles connections from clients
func (p *ConsulRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
clientMeter.Mark(1)
metrics.GetOrRegisterMeter("connections", p.Metrics).Mark(1)
}
return conn, true
}

// PreCall handles rpc call from clients
func (p *ConsulRegisterPlugin) PreCall(_ context.Context, _, _ string, args interface{}) (interface{}, error) {
if p.Metrics != nil {
metrics.GetOrRegisterMeter("calls", p.Metrics).Mark(1)
}
return args, nil
}

// Register handles registering event.
// this service is registered at BASE/serviceName/thisIpAddress node
func (p *ConsulRegisterPlugin) Register(name string, rcvr interface{}, metadata string) (err error) {
Expand Down
22 changes: 16 additions & 6 deletions serverplugin/etcd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package serverplugin

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -78,10 +79,10 @@ func (p *EtcdRegisterPlugin) Start() error {
close(p.done)
return
case <-ticker.C:
var data []byte
extra := make(map[string]string)
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
data = []byte(fmt.Sprintf("newconnected=%.2f", clientMeter.RateMean()))
extra["calls"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("calls", p.Metrics).RateMean())
extra["connections"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("connections", p.Metrics).RateMean())
}
//set this same metrics for all services at this server
for _, name := range p.Services {
Expand All @@ -101,7 +102,9 @@ func (p *EtcdRegisterPlugin) Start() error {

} else {
v, _ := url.ParseQuery(string(kvPair.Value))
v.Set("tps", string(data))
for key, value := range extra {
v.Set(key, value)
}
p.kv.Put(nodePath, []byte(v.Encode()), &store.WriteOptions{TTL: p.UpdateInterval * 3})
}
}
Expand Down Expand Up @@ -145,12 +148,19 @@ func (p *EtcdRegisterPlugin) Stop() error {
// HandleConnAccept handles connections from clients
func (p *EtcdRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
clientMeter.Mark(1)
metrics.GetOrRegisterMeter("connections", p.Metrics).Mark(1)
}
return conn, true
}

// PreCall handles rpc call from clients
func (p *EtcdRegisterPlugin) PreCall(_ context.Context, _, _ string, args interface{}) (interface{}, error) {
if p.Metrics != nil {
metrics.GetOrRegisterMeter("calls", p.Metrics).Mark(1)
}
return args, nil
}

// Register handles registering event.
// this service is registered at BASE/serviceName/thisIpAddress node
func (p *EtcdRegisterPlugin) Register(name string, rcvr interface{}, metadata string) (err error) {
Expand Down
22 changes: 16 additions & 6 deletions serverplugin/etcdv3.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package serverplugin

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -77,10 +78,10 @@ func (p *EtcdV3RegisterPlugin) Start() error {
close(p.done)
return
case <-ticker.C:
var data []byte
extra := make(map[string]string)
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
data = []byte(fmt.Sprintf("newconnected=%.2f", clientMeter.RateMean()))
extra["calls"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("calls", p.Metrics).RateMean())
extra["connections"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("connections", p.Metrics).RateMean())
}
//set this same metrics for all services at this server
for _, name := range p.Services {
Expand All @@ -100,7 +101,9 @@ func (p *EtcdV3RegisterPlugin) Start() error {

} else {
v, _ := url.ParseQuery(string(kvPair.Value))
v.Set("tps", string(data))
for key, value := range extra {
v.Set(key, value)
}
p.kv.Put(nodePath, []byte(v.Encode()), &store.WriteOptions{TTL: p.UpdateInterval + time.Second})
}
}
Expand Down Expand Up @@ -144,12 +147,19 @@ func (p *EtcdV3RegisterPlugin) Stop() error {
// HandleConnAccept handles connections from clients
func (p *EtcdV3RegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
clientMeter.Mark(1)
metrics.GetOrRegisterMeter("connections", p.Metrics).Mark(1)
}
return conn, true
}

// PreCall handles rpc call from clients
func (p *EtcdV3RegisterPlugin) PreCall(_ context.Context, _, _ string, args interface{}) (interface{}, error) {
if p.Metrics != nil {
metrics.GetOrRegisterMeter("calls", p.Metrics).Mark(1)
}
return args, nil
}

// Register handles registering event.
// this service is registered at BASE/serviceName/thisIpAddress node
func (p *EtcdV3RegisterPlugin) Register(name string, rcvr interface{}, metadata string) (err error) {
Expand Down
22 changes: 16 additions & 6 deletions serverplugin/mdns.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package serverplugin

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -78,16 +79,18 @@ func (p *MDNSRegisterPlugin) Start() error {
break
}

var data []byte
extra := make(map[string]string)
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
data = []byte(fmt.Sprintf("newconnected=%.2f", clientMeter.RateMean()))
extra["calls"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("calls", p.Metrics).RateMean())
extra["connections"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("connections", p.Metrics).RateMean())
}

//set this same metrics for all services at this server
for _, sm := range p.Services {
v, _ := url.ParseQuery(string(sm.Meta))
v.Set("tps", string(data))
for key, value := range extra {
v.Set(key, value)
}
sm.Meta = v.Encode()
}
ss, _ := json.Marshal(p.Services)
Expand Down Expand Up @@ -139,12 +142,19 @@ func (p *MDNSRegisterPlugin) initMDNS() {
// HandleConnAccept handles connections from clients
func (p *MDNSRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
clientMeter.Mark(1)
metrics.GetOrRegisterMeter("connections", p.Metrics).Mark(1)
}
return conn, true
}

// PreCall handles rpc call from clients
func (p *MDNSRegisterPlugin) PreCall(_ context.Context, _, _ string, args interface{}) (interface{}, error) {
if p.Metrics != nil {
metrics.GetOrRegisterMeter("calls", p.Metrics).Mark(1)
}
return args, nil
}

// Register handles registering event.
// this service is registered at BASE/serviceName/thisIpAddress node
func (p *MDNSRegisterPlugin) Register(name string, rcvr interface{}, metadata string) (err error) {
Expand Down
22 changes: 16 additions & 6 deletions serverplugin/redis.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package serverplugin

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -78,10 +79,10 @@ func (p *RedisRegisterPlugin) Start() error {
close(p.done)
return
case <-ticker.C:
var data []byte
extra := make(map[string]string)
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
data = []byte(fmt.Sprintf("newconnected=%.2f", clientMeter.RateMean()))
extra["calls"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("calls", p.Metrics).RateMean())
extra["connections"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("connections", p.Metrics).RateMean())
}
//set this same metrics for all services at this server
for _, name := range p.Services {
Expand All @@ -101,7 +102,9 @@ func (p *RedisRegisterPlugin) Start() error {

} else {
v, _ := url.ParseQuery(string(kvPair.Value))
v.Set("tps", string(data))
for key, value := range extra {
v.Set(key, value)
}
p.kv.Put(nodePath, []byte(v.Encode()), &store.WriteOptions{TTL: p.UpdateInterval * 2})
}
}
Expand Down Expand Up @@ -146,12 +149,19 @@ func (p *RedisRegisterPlugin) Stop() error {
// HandleConnAccept handles connections from clients
func (p *RedisRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
clientMeter.Mark(1)
metrics.GetOrRegisterMeter("connections", p.Metrics).Mark(1)
}
return conn, true
}

// PreCall handles rpc call from clients
func (p *RedisRegisterPlugin) PreCall(_ context.Context, _, _ string, args interface{}) (interface{}, error) {
if p.Metrics != nil {
metrics.GetOrRegisterMeter("calls", p.Metrics).Mark(1)
}
return args, nil
}

// Register handles registering event.
// this service is registered at BASE/serviceName/thisIpAddress node
func (p *RedisRegisterPlugin) Register(name string, rcvr interface{}, metadata string) (err error) {
Expand Down
22 changes: 16 additions & 6 deletions serverplugin/zookeeper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package serverplugin

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -83,10 +84,10 @@ func (p *ZooKeeperRegisterPlugin) Start() error {
close(p.done)
return
case <-ticker.C:
var data []byte
extra := make(map[string]string)
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
data = []byte(fmt.Sprintf("newconnected=%.2f", clientMeter.RateMean()))
extra["calls"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("calls", p.Metrics).RateMean())
extra["connections"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("connections", p.Metrics).RateMean())
}
//set this same metrics for all services at this server
for _, name := range p.Services {
Expand All @@ -105,7 +106,9 @@ func (p *ZooKeeperRegisterPlugin) Start() error {
}
} else {
v, _ := url.ParseQuery(string(kvPaire.Value))
v.Set("tps", string(data))
for key, value := range extra {
v.Set(key, value)
}
p.kv.Put(nodePath, []byte(v.Encode()), &store.WriteOptions{TTL: p.UpdateInterval * 2})
}
}
Expand Down Expand Up @@ -154,12 +157,19 @@ func (p *ZooKeeperRegisterPlugin) Stop() error {
// HandleConnAccept handles connections from clients
func (p *ZooKeeperRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
clientMeter.Mark(1)
metrics.GetOrRegisterMeter("connections", p.Metrics).Mark(1)
}
return conn, true
}

// PreCall handles rpc call from clients
func (p *ZooKeeperRegisterPlugin) PreCall(_ context.Context, _, _ string, args interface{}) (interface{}, error) {
if p.Metrics != nil {
metrics.GetOrRegisterMeter("calls", p.Metrics).Mark(1)
}
return args, nil
}

// Register handles registering event.
// this service is registered at BASE/serviceName/thisIpAddress node
func (p *ZooKeeperRegisterPlugin) Register(name string, rcvr interface{}, metadata string) (err error) {
Expand Down

0 comments on commit 46d874e

Please sign in to comment.