Skip to content

Commit

Permalink
Add tool to whitelist SearchAttributes (cadence-workflow#2219)
Browse files Browse the repository at this point in the history
* Move get valid search attributes to cluster command
* Add UpdateValue to dynamic config
* Add putMapping to ESclient
* Add Admin API for whitelist search attributes
* Add admin CLI
  • Loading branch information
vancexu authored Jul 17, 2019
1 parent 4f35963 commit 0f0dbc1
Show file tree
Hide file tree
Showing 41 changed files with 1,889 additions and 327 deletions.
1,128 changes: 944 additions & 184 deletions .gen/go/admin/admin.go

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions .gen/go/admin/adminserviceclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion .gen/go/admin/adminserviceserver/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions .gen/go/admin/adminservicetest/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions client/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ func NewClient(
}
}

func (c *clientImpl) AddSearchAttribute(
ctx context.Context,
request *admin.AddSearchAttributeRequest,
opts ...yarpc.CallOption,
) error {

opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getRandomClient()
if err != nil {
return err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.AddSearchAttribute(ctx, request, opts...)
}

func (c *clientImpl) DescribeHistoryHost(
ctx context.Context,
request *shared.DescribeHistoryHostRequest,
Expand Down
19 changes: 18 additions & 1 deletion client/admin/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package admin

import (
"context"

"github.com/uber/cadence/.gen/go/admin"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/metrics"
Expand All @@ -44,6 +43,24 @@ func NewMetricClient(client Client, metricsClient metrics.Client) Client {
}
}

func (c *metricClient) AddSearchAttribute(
ctx context.Context,
request *admin.AddSearchAttributeRequest,
opts ...yarpc.CallOption,
) error {

c.metricsClient.IncCounter(metrics.AdminClientAddSearchAttributeScope, metrics.CadenceClientRequests)

sw := c.metricsClient.StartTimer(metrics.AdminClientAddSearchAttributeScope, metrics.CadenceClientLatency)
err := c.client.AddSearchAttribute(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.AdminClientAddSearchAttributeScope, metrics.CadenceClientFailures)
}
return err
}

func (c *metricClient) DescribeHistoryHost(
ctx context.Context,
request *shared.DescribeHistoryHostRequest,
Expand Down
12 changes: 12 additions & 0 deletions client/admin/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ func NewRetryableClient(client Client, policy backoff.RetryPolicy, isRetryable b
}
}

func (c *retryableClient) AddSearchAttribute(
ctx context.Context,
request *admin.AddSearchAttributeRequest,
opts ...yarpc.CallOption,
) error {

op := func() error {
return c.client.AddSearchAttribute(ctx, request, opts...)
}
return backoff.Retry(op, c.policy, c.isRetryable)
}

func (c *retryableClient) DescribeHistoryHost(
ctx context.Context,
request *shared.DescribeHistoryHostRequest,
Expand Down
30 changes: 30 additions & 0 deletions common/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type (
ScrollFirstPage(ctx context.Context, index, query string) (*elastic.SearchResult, ScrollService, error)
Count(ctx context.Context, index, query string) (int64, error)
RunBulkProcessor(ctx context.Context, p *BulkProcessorParameters) (*elastic.BulkProcessor, error)
PutMapping(ctx context.Context, index, root, key, valueType string) error
}

// ScrollService is a interface for elastic.ScrollService
Expand Down Expand Up @@ -150,6 +151,35 @@ func (c *elasticWrapper) RunBulkProcessor(ctx context.Context, p *BulkProcessorP
Do(ctx)
}

// root is for nested object like Attr property for search attributes.
func (c *elasticWrapper) PutMapping(ctx context.Context, index, root, key, valueType string) error {
body := buildPutMappingBody(root, key, valueType)
_, err := c.client.PutMapping().Index(index).Type("_doc").BodyJson(body).Do(ctx)
return err
}

func buildPutMappingBody(root, key, valueType string) map[string]interface{} {
body := make(map[string]interface{})
if len(root) != 0 {
body["properties"] = map[string]interface{}{
root: map[string]interface{}{
"properties": map[string]interface{}{
key: map[string]interface{}{
"type": valueType,
},
},
},
}
} else {
body["properties"] = map[string]interface{}{
key: map[string]interface{}{
"type": valueType,
},
}
}
return body
}

func (s *scrollServiceImpl) Clear(ctx context.Context) error {
return s.scrollService.Clear(ctx)
}
49 changes: 49 additions & 0 deletions common/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package elasticsearch

import (
"fmt"
"github.com/stretchr/testify/require"
"testing"
)

func Test_BuildPutMappingBody(t *testing.T) {
tests := []struct {
root string
expected string
}{
{
root: "Attr",
expected: "map[properties:map[Attr:map[properties:map[testKey:map[type:text]]]]]",
},
{
root: "",
expected: "map[properties:map[testKey:map[type:text]]]",
},
}
k := "testKey"
v := "text"

for _, test := range tests {
require.Equal(t, test.expected, fmt.Sprintf("%v", buildPutMappingBody(test.root, k, v)))
}
}
6 changes: 6 additions & 0 deletions common/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package elasticsearch

import (
"github.com/uber/cadence/common"
"net/url"
)

Expand All @@ -32,3 +33,8 @@ type (
Indices map[string]string `yaml:indices`
}
)

// GetVisibilityIndex return visibility index name
func (cfg *Config) GetVisibilityIndex() string {
return cfg.Indices[common.VisibilityAppName]
}
14 changes: 14 additions & 0 deletions common/elasticsearch/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package tag

import (
"fmt"
"time"
)

Expand Down Expand Up @@ -335,12 +336,17 @@ func Name(k string) Tag {
return newStringTag("name", k)
}

// Value returns tag for Key
// Value returns tag for Value
func Value(v interface{}) Tag {
return newObjectTag("value", v)
}

// DefaultValue returns tag for Key
// ValueType returns tag for ValueType
func ValueType(v interface{}) Tag {
return newStringTag("value-type", fmt.Sprintf("%T", v))
}

// DefaultValue returns tag for DefaultValue
func DefaultValue(v interface{}) Tag {
return newObjectTag("default-value", v)
}
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ const (
FrontendClientTerminateWorkflowExecutionScope
// FrontendClientUpdateDomainScope tracks RPC calls to frontend service
FrontendClientUpdateDomainScope
// AdminClientAddSearchAttributeScope tracks RPC calls to admin service
AdminClientAddSearchAttributeScope
// AdminClientDescribeHistoryHostScope tracks RPC calls to admin service
AdminClientDescribeHistoryHostScope
// AdminClientDescribeWorkflowExecutionScope tracks RPC calls to admin service
Expand Down Expand Up @@ -1001,6 +1003,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendClientStartWorkflowExecutionScope: {operation: "FrontendClientStartWorkflowExecution", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}},
FrontendClientTerminateWorkflowExecutionScope: {operation: "FrontendClientTerminateWorkflowExecution", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}},
FrontendClientUpdateDomainScope: {operation: "FrontendClientUpdateDomain", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}},
AdminClientAddSearchAttributeScope: {operation: "AdminClientAddSearchAttribute", tags: map[string]string{CadenceRoleTagName: AdminRoleTagValue}},
AdminClientDescribeHistoryHostScope: {operation: "AdminClientDescribeHistoryHost", tags: map[string]string{CadenceRoleTagName: AdminRoleTagValue}},
AdminClientDescribeWorkflowExecutionScope: {operation: "AdminClientDescribeWorkflowExecution", tags: map[string]string{CadenceRoleTagName: AdminRoleTagValue}},
AdminClientGetWorkflowExecutionRawHistoryScope: {operation: "AdminClientGetWorkflowExecutionRawHistory", tags: map[string]string{CadenceRoleTagName: AdminRoleTagValue}},
Expand Down
Loading

0 comments on commit 0f0dbc1

Please sign in to comment.