Skip to content

Commit

Permalink
consolidate code for scope id generating and printing
Browse files Browse the repository at this point in the history
  • Loading branch information
valyala committed Nov 7, 2017
1 parent 1658015 commit 48f9245
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 18 deletions.
3 changes: 1 addition & 2 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"net/http/httputil"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/Vertamedia/chproxy/cache"
Expand Down Expand Up @@ -424,7 +423,7 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) {
localAddr = addr.String()
}
s := &scope{
id: atomic.AddUint64(&scopeID, 1),
id: newScopeID(),
host: h,
cluster: c,
user: u,
Expand Down
42 changes: 29 additions & 13 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

func (s *scope) String() string {
return fmt.Sprintf("[ Id: %X; User %q(%d) proxying as %q(%d) to %q(%d); RemoteAddr: %q; LocalAddr: %q ]",
s.id,
s.user.name, s.user.queryCounter.load(),
s.clusterUser.name, s.clusterUser.queryCounter.load(),
s.host.addr.Host, s.host.load(),
s.remoteAddr, s.localAddr)
type scopeID uint64

func (sid scopeID) String() string {
return fmt.Sprintf("%08X", uint64(sid))
}

func newScopeID() scopeID {
sid := atomic.AddUint64(&nextScopeID, 1)
return scopeID(sid)
}

var nextScopeID = uint64(time.Now().UnixNano())

type scope struct {
id uint64
id scopeID
host *host
cluster *cluster
user *user
Expand All @@ -38,7 +42,14 @@ type scope struct {
labels prometheus.Labels
}

var scopeID = uint64(time.Now().UnixNano())
func (s *scope) String() string {
return fmt.Sprintf("[ Id: %s; User %q(%d) proxying as %q(%d) to %q(%d); RemoteAddr: %q; LocalAddr: %q ]",
s.id,
s.user.name, s.user.queryCounter.load(),
s.clusterUser.name, s.clusterUser.queryCounter.load(),
s.host.addr.Host, s.host.load(),
s.remoteAddr, s.localAddr)
}

func (s *scope) incQueued() error {
if s.user.queueCh == nil && s.clusterUser.queueCh == nil {
Expand Down Expand Up @@ -194,9 +205,9 @@ func (s *scope) dec() {
const killQueryTimeout = time.Second * 30

func (s *scope) killQuery() error {
log.Debugf("killing timed out query with query_id=%X", s.id)
log.Debugf("killing timed out query with query_id=%s", s.id)

query := fmt.Sprintf("KILL QUERY WHERE query_id = '%X'", s.id)
query := fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", s.id)
r := strings.NewReader(query)
addr := s.host.addr.String()
req, err := http.NewRequest("POST", addr, r)
Expand Down Expand Up @@ -227,7 +238,12 @@ func (s *scope) killQuery() error {
query, addr, resp.StatusCode, responseBody)
}

log.Debugf("timed out query with query_id=%X has been successfully killed", s.id)
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("cannot read response body for the query %q: %s", query, err)
}

log.Debugf("killed timed out query with query_id=%s; respBody: %q", s.id, respBody)
return nil
}

Expand All @@ -247,7 +263,7 @@ func (s *scope) decorateRequest(req *http.Request) *http.Request {
params := make(url.Values)

// Set query_id as scope_id to have possibility to kill query if needed.
params.Set("query_id", fmt.Sprintf("%X", s.id))
params.Set("query_id", s.id.String())

// Keep allowed params.
q := req.URL.Query()
Expand Down
5 changes: 2 additions & 3 deletions scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"fmt"
"net/url"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -32,7 +31,7 @@ func TestRunningQueries(t *testing.T) {
u1 := &user{
maxConcurrentQueries: 1,
}
s := &scope{id: atomic.AddUint64(&scopeID, 1)}
s := &scope{id: newScopeID()}
s.host = c.getHost()
s.cluster = c
s.user = u1
Expand Down Expand Up @@ -77,7 +76,7 @@ func TestRunningQueries(t *testing.T) {
u2 := &user{
maxConcurrentQueries: 1,
}
s = &scope{id: atomic.AddUint64(&scopeID, 1)}
s = &scope{id: newScopeID()}
s.host = c.getHost()
s.cluster = c
s.user = u2
Expand Down

0 comments on commit 48f9245

Please sign in to comment.