Skip to content

Commit

Permalink
Added 2 more tags in log for comparator to use. (cadence-workflow#5646)
Browse files Browse the repository at this point in the history
* add 2 more things in log for comparator to use

* fix typo

* change time variables to be value instead of pointers

* log msg change
  • Loading branch information
bowenxia authored Feb 7, 2024
1 parent 5adaa1f commit a7d504a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 33 deletions.
8 changes: 8 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func Timestamp(timestamp time.Time) Tag {
return newTimeTag("timestamp", timestamp)
}

func EarliestTime(time int64) Tag {
return newInt64("earliest-time", time)
}

func LatestTime(time int64) Tag {
return newInt64("latest-time", time)
}

/////////////////// Workflow tags defined here: ( wf is short for workflow) ///////////////////

// WorkflowAction returns tag for WorkflowAction
Expand Down
97 changes: 64 additions & 33 deletions common/persistence/pinotVisibilityTripleManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"math/rand"
"strings"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
Expand Down Expand Up @@ -322,6 +323,8 @@ type userParameters struct {
workflowID string
closeStatus int // if it is -1, then will have --open flag in comparator workflow
customQuery string
earliestTime int64
latestTime int64
}

// For Pinot Migration uses. It will be a temporary usage
Expand All @@ -342,7 +345,10 @@ func (v *pinotVisibilityTripleManager) logUserQueryParameters(userParam userPara
tag.WorkflowType(userParam.workflowType),
tag.WorkflowID(userParam.workflowID),
tag.WorkflowCloseStatus(userParam.closeStatus),
tag.VisibilityQuery(filterAttrPrefix(userParam.customQuery)))
tag.VisibilityQuery(filterAttrPrefix(userParam.customQuery)),
tag.EarliestTime(userParam.earliestTime),
tag.LatestTime(userParam.latestTime))

}

// This is for only logUserQueryParameters (for Pinot Response comparator) usage.
Expand All @@ -357,9 +363,11 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutions(
request *ListWorkflowExecutionsRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: -1, // is open. Will have --open flag in comparator workflow
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: -1, // is open. Will have --open flag in comparator workflow
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)

manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
Expand All @@ -371,9 +379,11 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutions(
request *ListWorkflowExecutionsRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutions(ctx, request)
Expand All @@ -388,6 +398,8 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByType(
domainName: request.Domain,
workflowType: request.WorkflowTypeName,
closeStatus: -1, // is open. Will have --open flag in comparator workflow
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListOpenWorkflowExecutionsByType(ctx, request)
Expand All @@ -402,6 +414,8 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByType(
domainName: request.Domain,
workflowType: request.WorkflowTypeName,
closeStatus: 6, // 6 means not set closeStatus.
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutionsByType(ctx, request)
Expand All @@ -412,10 +426,12 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByWorkflowID(
request *ListWorkflowExecutionsByWorkflowIDRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowID: request.WorkflowID,
closeStatus: -1,
operation: string(Operation.LIST),
domainName: request.Domain,
workflowID: request.WorkflowID,
closeStatus: -1,
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
Expand All @@ -426,10 +442,12 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByWorkflowID(
request *ListWorkflowExecutionsByWorkflowIDRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
workflowID: request.WorkflowID,
closeStatus: 6, // 6 means not set closeStatus.
operation: string(Operation.LIST),
domainName: request.Domain,
workflowID: request.WorkflowID,
closeStatus: 6, // 6 means not set closeStatus.
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutionsByWorkflowID(ctx, request)
Expand All @@ -440,9 +458,11 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByStatus(
request *ListClosedWorkflowExecutionsByStatusRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: int(request.Status),
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: int(request.Status),
earliestTime: request.EarliestTime,
latestTime: request.LatestTime,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListClosedWorkflowExecutionsByStatus(ctx, request)
Expand All @@ -452,10 +472,15 @@ func (v *pinotVisibilityTripleManager) GetClosedWorkflowExecution(
ctx context.Context,
request *GetClosedWorkflowExecutionRequest,
) (*GetClosedWorkflowExecutionResponse, error) {
earlistTime := int64(0) // this is to get all closed workflow execution
latestTime := time.Now().UnixNano()

v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
earliestTime: earlistTime,
latestTime: latestTime,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.GetClosedWorkflowExecution(ctx, request)
Expand All @@ -466,10 +491,12 @@ func (v *pinotVisibilityTripleManager) ListWorkflowExecutions(
request *ListWorkflowExecutionsByQueryRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
earliestTime: -1,
latestTime: -1,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ListWorkflowExecutions(ctx, request)
Expand All @@ -480,10 +507,12 @@ func (v *pinotVisibilityTripleManager) ScanWorkflowExecutions(
request *ListWorkflowExecutionsByQueryRequest,
) (*ListWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
operation: string(Operation.LIST),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
earliestTime: -1,
latestTime: -1,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.ScanWorkflowExecutions(ctx, request)
Expand All @@ -494,10 +523,12 @@ func (v *pinotVisibilityTripleManager) CountWorkflowExecutions(
request *CountWorkflowExecutionsRequest,
) (*CountWorkflowExecutionsResponse, error) {
v.logUserQueryParameters(userParameters{
operation: string(Operation.COUNT),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
operation: string(Operation.COUNT),
domainName: request.Domain,
closeStatus: 6, // 6 means not set closeStatus.
customQuery: request.Query,
earliestTime: -1,
latestTime: -1,
}, request.Domain)
manager := v.chooseVisibilityManagerForRead(ctx, request.Domain)
return manager.CountWorkflowExecutions(ctx, request)
Expand Down

0 comments on commit a7d504a

Please sign in to comment.