Skip to content

Commit

Permalink
consul: send agent source data as separate query source
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanuber committed Jun 30, 2016
1 parent 104b234 commit 62884a2
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 92 deletions.
10 changes: 5 additions & 5 deletions command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,11 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req,
AllowStale: d.config.AllowStale,
},

// Always pass the local agent through as the source. In the DNS
// interface, there is no provision for passing additional query
// parameters, so we send the local agent's data through to allow
// distance sorting relative to ourself on the server side.
Source: structs.QuerySource{
// Always pass the local agent through. In the DNS interface, there
// is no provision for passing additional query parameters, so we
// send the local agent's data through to allow distance sorting
// relative to ourself on the server side.
Agent: structs.QuerySource{
Datacenter: d.agent.config.Datacenter,
Node: d.agent.config.NodeName,
},
Expand Down
13 changes: 1 addition & 12 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,18 +531,7 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) {
// DC in the request, if given, or else the agent's DC.
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
s.parseDC(req, &source.Datacenter)

// Always start with the local node as the source.
source.Node = s.agent.config.NodeName

// If ?near was provided, take the value send it along. We also mark the
// fact that an override was provided with the NearRequested bool.
if node := req.URL.Query().Get("near"); node != "" {
source.NearRequested = true
if node != "_agent" {
source.Node = node
}
}
source.Node = req.URL.Query().Get("near")
}

// parse is a convenience method for endpoints that need
Expand Down
9 changes: 4 additions & 5 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,17 +345,16 @@ func TestParseSource(t *testing.T) {
defer srv.Shutdown()
defer srv.agent.Shutdown()

// Default is agent's DC and the local node, with the near flag false
// (since the user didn't care, then just give them the cheapest possible
// query).
// Default is agent's DC and no node (since the user didn't care,
// then just give them the cheapest possible query).
req, err := http.NewRequest("GET",
"/v1/catalog/nodes", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
source := structs.QuerySource{}
srv.parseSource(req, &source)
if source.Datacenter != "dc1" || source.Node != srv.agent.config.NodeName {
if source.Datacenter != "dc1" || source.Node != "" {
t.Fatalf("bad: %v", source)
}

Expand All @@ -367,7 +366,7 @@ func TestParseSource(t *testing.T) {
}
source = structs.QuerySource{}
srv.parseSource(req, &source)
if source.Datacenter != "dc1" || source.Node != "bob" || !source.NearRequested {
if source.Datacenter != "dc1" || source.Node != "bob" {
t.Fatalf("bad: %v", source)
}

Expand Down
8 changes: 8 additions & 0 deletions command/agent/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func parseLimit(req *http.Request, limit *int) error {
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
},
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
Expand Down Expand Up @@ -131,6 +135,10 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
},
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
Expand Down
30 changes: 20 additions & 10 deletions command/agent/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,12 @@ func TestPreparedQuery_Execute(t *testing.T) {
QueryIDOrName: "my-id",
Limit: 5,
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "my-node",
NearRequested: true,
Datacenter: "dc1",
Node: "my-node",
},
Agent: structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
Expand Down Expand Up @@ -332,11 +335,15 @@ func TestPreparedQuery_Execute(t *testing.T) {
}

m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if args.Source.NearRequested {
t.Fatal("expect NearRequested to be false")
if args.Source.Node != "" {
t.Fatalf("expect node to be empty, got %q", args.Source.Node)
}
expect := structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
}
if args.Source.Node == "" {
t.Fatalf("expect Source to be %q, got: %q", srv.agent.config.NodeName, args.Source.Node)
if !reflect.DeepEqual(args.Agent, expect) {
t.Fatalf("expect: %#v\nactual: %#v", expect, args.Agent)
}
return nil
}
Expand Down Expand Up @@ -383,9 +390,12 @@ func TestPreparedQuery_Explain(t *testing.T) {
QueryIDOrName: "my-id",
Limit: 5,
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "my-node",
NearRequested: true,
Datacenter: "dc1",
Node: "my-node",
},
Agent: structs.QuerySource{
Datacenter: srv.agent.config.Datacenter,
Node: srv.agent.config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
Expand Down
33 changes: 18 additions & 15 deletions consul/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,22 +369,25 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// requested an RTT sort.
reply.Nodes.Shuffle()

// Check if the query carries a Near parameter, or if the requestor
// supplied a ?near parameter in the request. We can apply distance
// sorting if either of these cases are true, but we don't want to
// affect the established round-robin default.
if args.Source.NearRequested || query.Service.Near != "" {
// Apply the "near" parameter if it exists on the prepared query and
// was not provided in the request args.
if !args.Source.NearRequested && query.Service.Near != "_agent" {
args.Source.Node = query.Service.Near
}
// Build the query source. This can be provided by the client, or by
// the prepared query. Client-specified takes priority.
qs := args.Source
if qs.Datacenter == "" {
qs.Datacenter = args.Agent.Datacenter
}
if query.Service.Near != "" && qs.Node == "" {
qs.Node = query.Service.Near
}

// Perform the distance sort
err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
if err != nil {
return err
}
// Respect the magic "_agent" flag.
if qs.Node == "_agent" {
qs.Node = args.Agent.Node
}

// Perform the distance sort
err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
if err != nil {
return err
}

// Apply the limit if given.
Expand Down
84 changes: 44 additions & 40 deletions consul/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1548,9 +1548,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
NearRequested: true,
Datacenter: "dc1",
Node: "node3",
},
QueryOptions: structs.QueryOptions{Token: execToken},
}
Expand Down Expand Up @@ -1617,13 +1616,12 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Now run the query and make sure the baked-in service is returned.
// Now run the query and make sure the sort looks right.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "foo",
NearRequested: false,
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
Expand All @@ -1645,17 +1643,20 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
}

// Query again, but this time set NearRequested to "true". This should
// prove that we allow overriding the baked-in value with ?near.
// Query again, but this time set a client-supplied query source. This
// proves that we allow overriding the baked-in value with ?near.
{
// Set up the query with a non-existent node. This will cause the
// nodes to be shuffled if the passed node is respected, proving
// that we allow the override to happen.
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "foo",
NearRequested: true,
Datacenter: "dc1",
Node: "foo",
},
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
Expand Down Expand Up @@ -1683,14 +1684,18 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
}

// Check that if NearRequested is passed as true, that we sort based
// on the given node and do not use the one stored in the PQ.
// Bake the magic "_agent" flag into the query.
query.Query.Service.Near = "_agent"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Check that we sort the local agent first when the magic flag is set.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "node1",
NearRequested: true,
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
Expand All @@ -1699,9 +1704,6 @@ func TestPreparedQuery_Execute(t *testing.T) {

var reply structs.PreparedQueryExecuteResponse

// We just want to check that we get a non-local node, because
// the local node is the only one with working coordinates.
shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -1710,29 +1712,19 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
break
t.Fatalf("expect node3 first, got: %q", node)
}
}
if !shuffled {
t.Fatal("expect non-local results")
}
}

// Set the query to prefer a colocated service using the magic _agent token
query.Query.Service.Near = "_agent"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}

// Check that the node returned is the one we asked for in the
// query source. This proves that if the PQ has "_agent" baked
// in, we always use the passed-in node.
// Check that the query isn't just sorting "node3" first because we
// provided it in the Agent query source. Proves that we use the
// Agent source when the magic "_agent" flag is passed.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
Node: "foo",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
Expand All @@ -1741,6 +1733,9 @@ func TestPreparedQuery_Execute(t *testing.T) {

var reply structs.PreparedQueryExecuteResponse

// Expect the set to be shuffled since we have no coordinates
// on the "foo" node.
shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -1749,19 +1744,28 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3 first, got: %q", node)
shuffled = true
break
}
}

if !shuffled {
t.Fatal("expect nodes to be shuffled")
}
}

// Shuffles if the response comes from a non-local DC. Proves that the
// near parameter does not affect this order.
// agent query source does not interfere with the order.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc2",
Node: "node3",
},
Agent: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
Expand Down
4 changes: 4 additions & 0 deletions consul/structs/prepared_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ type PreparedQueryExecuteRequest struct {
// network coordinates.
Source QuerySource

// Agent is used to carry around a reference to the agent which initiated
// the execute request. Used to distance-sort relative to the local node.
Agent QuerySource

// QueryOptions (unfortunately named here) controls the consistency
// settings for the query lookup itself, as well as the service lookups.
QueryOptions
Expand Down
5 changes: 0 additions & 5 deletions consul/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,6 @@ func (r *DeregisterRequest) RequestDatacenter() string {
type QuerySource struct {
Datacenter string
Node string

// NearRequested indicates where the values in this QuerySource came
// from. When true, the values were provided by the requestor,
// otherwise they were filled by the agent servicing the request.
NearRequested bool
}

// DCSpecificRequest is used to query about a specific DC
Expand Down

0 comments on commit 62884a2

Please sign in to comment.