Skip to content

Commit

Permalink
Add log user query param (cadence-workflow#5437)
Browse files Browse the repository at this point in the history
* add log for uesr queries

* add a value for override visibility manager

* add a dash to close status tag

* delete unrelated comment

* add a flipr attribute to enable log customer query parameters

* fix an error for the dynamic config

* add a comment

* config the dynamic config files

* code change according to review

* add extra logic to override variable

* add a comment for queryFilter

* change variable name

* remove queryFilter and will put it into the comparator workflow

* clean up

* typo

* change logic for how to determine an open workflow

* delete an unused const

* set general case closeStatus to be 6

* Dummy commit to trigger build pipeline

---------

Co-authored-by: Neil Xie <[email protected]>
Co-authored-by: neil-xie <[email protected]>
Co-authored-by: David Porter <[email protected]>
Co-authored-by: Shijie Sheng <[email protected]>
  • Loading branch information
5 people authored Nov 15, 2023
1 parent 10ae6c4 commit bf90439
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 24 deletions.
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,11 @@ const (
// Default value: true
// Allowed filters: DomainName
EnableReadVisibilityFromPinot
// EnableLogCustomerQueryParameter is key for enable log customer query parameters
// KeyName: system.enableLogCustomerQueryParameter
// Value type: Bool
// Default value: false
EnableLogCustomerQueryParameter
// EmitShardDiffLog is whether emit the shard diff log
// KeyName: history.emitShardDiffLog
// Value type: Bool
Expand Down Expand Up @@ -3683,9 +3688,16 @@ var BoolKeys = map[BoolKey]DynamicBool{
},
EnableReadVisibilityFromPinot: DynamicBool{
KeyName: "system.enableReadVisibilityFromPinot",
Filters: []Filter{DomainName},
Description: "EnableReadVisibilityFromPinot is key for enable read from pinot or db visibility, usually using with AdvancedVisibilityWritingMode for seamless migration from db visibility to advanced visibility",
DefaultValue: true,
},
EnableLogCustomerQueryParameter: DynamicBool{
KeyName: "system.enableLogCustomerQueryParameter",
Filters: []Filter{DomainName},
Description: "EnableLogCustomerQueryParameter is key for enable log customer query parameters",
DefaultValue: false,
},
EmitShardDiffLog: DynamicBool{
KeyName: "history.emitShardDiffLog",
Description: "EmitShardDiffLog is whether emit the shard diff log",
Expand Down
10 changes: 10 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ func WorkflowCronSchedule(schedule string) Tag {
return newStringTag("wf-cron-schedule", schedule)
}

// WorkflowCloseStatus returns a tag to report a workflow's close status
func WorkflowCloseStatus(status int) Tag {
return newInt("close-status", status)
}

// IsWorkflowOpen returns a tag to report a workflow is open or not
func IsWorkflowOpen(isOpen bool) Tag {
return newBoolTag("is-workflow-open", isOpen)
}

// domain related

// WorkflowDomainID returns tag for WorkflowDomainID
Expand Down
1 change: 1 addition & 0 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (f *factoryImpl) NewVisibilityManager(
resourceConfig.EnableReadVisibilityFromPinot,
resourceConfig.EnableReadVisibilityFromES,
resourceConfig.AdvancedVisibilityWritingMode,
resourceConfig.EnableLogCustomerQueryParameter,
f.logger,
), nil
} else if params.PersistenceConfig.AdvancedVisibilityStore != "" {
Expand Down
196 changes: 182 additions & 14 deletions common/persistence/pinotVisibilityTripleManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package persistence
import (
"context"
"fmt"
"math/rand"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
Expand All @@ -33,16 +34,32 @@ import (

type (
pinotVisibilityTripleManager struct {
logger log.Logger
dbVisibilityManager VisibilityManager
pinotVisibilityManager VisibilityManager
esVisibilityManager VisibilityManager
readModeIsFromPinot dynamicconfig.BoolPropertyFnWithDomainFilter
readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter
writeMode dynamicconfig.StringPropertyFn
logger log.Logger
dbVisibilityManager VisibilityManager
pinotVisibilityManager VisibilityManager
esVisibilityManager VisibilityManager
readModeIsFromPinot dynamicconfig.BoolPropertyFnWithDomainFilter
readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter
writeMode dynamicconfig.StringPropertyFn
logCustomerQueryParameter dynamicconfig.BoolPropertyFnWithDomainFilter
}
)

const (
Primary = "Primary"
Secondary = "Secondary"
)

type OperationType string

var Operation = struct {
LIST OperationType
COUNT OperationType
}{
LIST: "list",
COUNT: "count",
}

var _ VisibilityManager = (*pinotVisibilityTripleManager)(nil)

// NewPinotVisibilityTripleManager create a visibility manager that operate on DB or Pinot based on dynamic config.
Expand All @@ -53,20 +70,22 @@ func NewPinotVisibilityTripleManager(
readModeIsFromPinot dynamicconfig.BoolPropertyFnWithDomainFilter,
readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter,
visWritingMode dynamicconfig.StringPropertyFn,
logCustomerQueryParameter dynamicconfig.BoolPropertyFnWithDomainFilter,
logger log.Logger,
) VisibilityManager {
if dbVisibilityManager == nil && pinotVisibilityManager == nil && esVisibilityManager == nil {
logger.Fatal("require one of dbVisibilityManager or pinotVisibilityManager or esVisibilityManager")
return nil
}
return &pinotVisibilityTripleManager{
dbVisibilityManager: dbVisibilityManager,
pinotVisibilityManager: pinotVisibilityManager,
esVisibilityManager: esVisibilityManager,
readModeIsFromPinot: readModeIsFromPinot,
readModeIsFromES: readModeIsFromES,
writeMode: visWritingMode,
logger: logger,
dbVisibilityManager: dbVisibilityManager,
pinotVisibilityManager: pinotVisibilityManager,
esVisibilityManager: esVisibilityManager,
readModeIsFromPinot: readModeIsFromPinot,
readModeIsFromES: readModeIsFromES,
writeMode: visWritingMode,
logger: logger,
logCustomerQueryParameter: logCustomerQueryParameter,
}
}

Expand Down Expand Up @@ -290,91 +309,240 @@ func (v *pinotVisibilityTripleManager) chooseVisibilityManagerForWrite(ctx conte
}
}

// For Pinot Migration uses. It will be a temporary usage
type userParameters struct {
operation string
domainName string
workflowType string
workflowID string
closeStatus int // if it is -1, then will have --open flag in comparator workflow
customQuery string
}

// For Pinot Migration uses. It will be a temporary usage
// logUserQueryParameters will log user queries' parameters so that a comparator workflow can consume
func (v *pinotVisibilityTripleManager) logUserQueryParameters(userParam userParameters, domain string) {
// Don't log if it is not enabled
if !v.logCustomerQueryParameter(domain) {
return
}
randNum := rand.Intn(10)
if randNum != 5 { // Intentionally to have 1/10 chance to log custom query parameters
return
}

v.logger.Info("Logging user query parameters for Pinot/ES response comparator...",
tag.OperationName(userParam.operation),
tag.WorkflowDomainName(userParam.domainName),
tag.WorkflowType(userParam.workflowType),
tag.WorkflowID(userParam.workflowID),
tag.WorkflowCloseStatus(userParam.closeStatus),
tag.VisibilityQuery(userParam.customQuery))
}

func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: -1, // is open. Will have --open flag in comparator workflow
}, request.Domain)

manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.ListOpenWorkflowExecutions(ctx, request)
}

func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.ListClosedWorkflowExecutions(ctx, request)
}

func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByType(
ctx context.Context,
request *ListWorkflowExecutionsByTypeRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowType: request.WorkflowTypeName,
closeStatus: -1, // is open. Will have --open flag in comparator workflow
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.ListOpenWorkflowExecutionsByType(ctx, request)
}

func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByType(
ctx context.Context,
request *ListWorkflowExecutionsByTypeRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowType: request.WorkflowTypeName,
closeStatus: 6, // 6 means not set closeStatus.
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.ListClosedWorkflowExecutionsByType(ctx, request)
}

func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *ListWorkflowExecutionsByWorkflowIDRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowID: request.WorkflowID,
closeStatus: -1,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
}

func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *ListWorkflowExecutionsByWorkflowIDRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowID: request.WorkflowID,
closeStatus: 6, // 6 means not set closeStatus.
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.ListClosedWorkflowExecutionsByWorkflowID(ctx, request)
}

func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByStatus(
ctx context.Context,
request *ListClosedWorkflowExecutionsByStatusRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: int(request.Status),
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.ListClosedWorkflowExecutionsByStatus(ctx, request)
}

func (v *pinotVisibilityTripleManager) GetClosedWorkflowExecution(
ctx context.Context,
request *GetClosedWorkflowExecutionRequest,
) (*GetClosedWorkflowExecutionResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.GetClosedWorkflowExecution(ctx, request)
}

func (v *pinotVisibilityTripleManager) ListWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsByQueryRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.ListWorkflowExecutions(ctx, request)
}

func (v *pinotVisibilityTripleManager) ScanWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsByQueryRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.ScanWorkflowExecutions(ctx, request)
}

func (v *pinotVisibilityTripleManager) CountWorkflowExecutions(
ctx context.Context,
request *CountWorkflowExecutionsRequest,
) (*CountWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.COUNT),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(request.Domain)
if override := ctx.Value("visibility-override"); override == Primary {
manager = v.esVisibilityManager
} else if override == Secondary {
manager = v.pinotVisibilityManager
}
return manager.CountWorkflowExecutions(ctx, request)
}

Expand Down
1 change: 0 additions & 1 deletion common/pinot/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination GenericClient_mock.go -self_package github.com/uber/cadence/common/pinot

package pinot

Expand Down
2 changes: 2 additions & 0 deletions common/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type (
AdvancedVisibilityWritingMode dynamicconfig.StringPropertyFn
// EnableReadVisibilityFromPinot is the read mode of visibility
EnableReadVisibilityFromPinot dynamicconfig.BoolPropertyFnWithDomainFilter
// EnableLogCustomerQueryParameter is to enable log customer parameters
EnableLogCustomerQueryParameter dynamicconfig.BoolPropertyFnWithDomainFilter

// configs for db visibility
EnableDBVisibilitySampling dynamicconfig.BoolPropertyFn `yaml:"-" json:"-"`
Expand Down
2 changes: 2 additions & 0 deletions config/dynamicconfig/development_pinot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ system.enableReadVisibilityFromES:
- value: false
system.enableReadVisibilityFromPinot:
- value: true
system.enableLogCustomerQueryParameter:
-value: false
frontend.validSearchAttributes:
- value:
DomainID: 1
Expand Down
Loading

0 comments on commit bf90439

Please sign in to comment.