diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 71b04217904..39e7f53cbbb 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -51,6 +51,14 @@ func Timestamp(timestamp time.Time) Tag { return newTimeTag("timestamp", timestamp) } +func EarliestTime(time int64) Tag { + return newInt64("earliest-time", time) +} + +func LatestTime(time int64) Tag { + return newInt64("latest-time", time) +} + /////////////////// Workflow tags defined here: ( wf is short for workflow) /////////////////// // WorkflowAction returns tag for WorkflowAction diff --git a/common/persistence/pinotVisibilityTripleManager.go b/common/persistence/pinotVisibilityTripleManager.go index 749cf89ebce..d15c153f56e 100644 --- a/common/persistence/pinotVisibilityTripleManager.go +++ b/common/persistence/pinotVisibilityTripleManager.go @@ -25,6 +25,7 @@ import ( "fmt" "math/rand" "strings" + "time" "github.com/uber/cadence/common" "github.com/uber/cadence/common/dynamicconfig" @@ -322,6 +323,8 @@ type userParameters struct { workflowID string closeStatus int // if it is -1, then will have --open flag in comparator workflow customQuery string + earliestTime int64 + latestTime int64 } // For Pinot Migration uses. It will be a temporary usage @@ -342,7 +345,10 @@ func (v *pinotVisibilityTripleManager) logUserQueryParameters(userParam userPara tag.WorkflowType(userParam.workflowType), tag.WorkflowID(userParam.workflowID), tag.WorkflowCloseStatus(userParam.closeStatus), - tag.VisibilityQuery(filterAttrPrefix(userParam.customQuery))) + tag.VisibilityQuery(filterAttrPrefix(userParam.customQuery)), + tag.EarliestTime(userParam.earliestTime), + tag.LatestTime(userParam.latestTime)) + } // This is for only logUserQueryParameters (for Pinot Response comparator) usage. @@ -357,9 +363,11 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutions( request *ListWorkflowExecutionsRequest, ) (*ListWorkflowExecutionsResponse, error) { v.logUserQueryParameters(userParameters{ - operation: string(Operation.LIST), - domainName: request.Domain, - closeStatus: -1, // is open. Will have --open flag in comparator workflow + operation: string(Operation.LIST), + domainName: request.Domain, + closeStatus: -1, // is open. Will have --open flag in comparator workflow + earliestTime: request.EarliestTime, + latestTime: request.LatestTime, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) @@ -371,9 +379,11 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutions( request *ListWorkflowExecutionsRequest, ) (*ListWorkflowExecutionsResponse, error) { v.logUserQueryParameters(userParameters{ - operation: string(Operation.LIST), - domainName: request.Domain, - closeStatus: 6, // 6 means not set closeStatus. + operation: string(Operation.LIST), + domainName: request.Domain, + closeStatus: 6, // 6 means not set closeStatus. + earliestTime: request.EarliestTime, + latestTime: request.LatestTime, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.ListClosedWorkflowExecutions(ctx, request) @@ -388,6 +398,8 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByType( domainName: request.Domain, workflowType: request.WorkflowTypeName, closeStatus: -1, // is open. Will have --open flag in comparator workflow + earliestTime: request.EarliestTime, + latestTime: request.LatestTime, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.ListOpenWorkflowExecutionsByType(ctx, request) @@ -402,6 +414,8 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByType( domainName: request.Domain, workflowType: request.WorkflowTypeName, closeStatus: 6, // 6 means not set closeStatus. + earliestTime: request.EarliestTime, + latestTime: request.LatestTime, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.ListClosedWorkflowExecutionsByType(ctx, request) @@ -412,10 +426,12 @@ func (v *pinotVisibilityTripleManager) ListOpenWorkflowExecutionsByWorkflowID( request *ListWorkflowExecutionsByWorkflowIDRequest, ) (*ListWorkflowExecutionsResponse, error) { v.logUserQueryParameters(userParameters{ - operation: string(Operation.LIST), - domainName: request.Domain, - workflowID: request.WorkflowID, - closeStatus: -1, + operation: string(Operation.LIST), + domainName: request.Domain, + workflowID: request.WorkflowID, + closeStatus: -1, + earliestTime: request.EarliestTime, + latestTime: request.LatestTime, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.ListOpenWorkflowExecutionsByWorkflowID(ctx, request) @@ -426,10 +442,12 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByWorkflowID( request *ListWorkflowExecutionsByWorkflowIDRequest, ) (*ListWorkflowExecutionsResponse, error) { v.logUserQueryParameters(userParameters{ - operation: string(Operation.LIST), - domainName: request.Domain, - workflowID: request.WorkflowID, - closeStatus: 6, // 6 means not set closeStatus. + operation: string(Operation.LIST), + domainName: request.Domain, + workflowID: request.WorkflowID, + closeStatus: 6, // 6 means not set closeStatus. + earliestTime: request.EarliestTime, + latestTime: request.LatestTime, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.ListClosedWorkflowExecutionsByWorkflowID(ctx, request) @@ -440,9 +458,11 @@ func (v *pinotVisibilityTripleManager) ListClosedWorkflowExecutionsByStatus( request *ListClosedWorkflowExecutionsByStatusRequest, ) (*ListWorkflowExecutionsResponse, error) { v.logUserQueryParameters(userParameters{ - operation: string(Operation.LIST), - domainName: request.Domain, - closeStatus: int(request.Status), + operation: string(Operation.LIST), + domainName: request.Domain, + closeStatus: int(request.Status), + earliestTime: request.EarliestTime, + latestTime: request.LatestTime, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.ListClosedWorkflowExecutionsByStatus(ctx, request) @@ -452,10 +472,15 @@ func (v *pinotVisibilityTripleManager) GetClosedWorkflowExecution( ctx context.Context, request *GetClosedWorkflowExecutionRequest, ) (*GetClosedWorkflowExecutionResponse, error) { + earlistTime := int64(0) // this is to get all closed workflow execution + latestTime := time.Now().UnixNano() + v.logUserQueryParameters(userParameters{ - operation: string(Operation.LIST), - domainName: request.Domain, - closeStatus: 6, // 6 means not set closeStatus. + operation: string(Operation.LIST), + domainName: request.Domain, + closeStatus: 6, // 6 means not set closeStatus. + earliestTime: earlistTime, + latestTime: latestTime, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.GetClosedWorkflowExecution(ctx, request) @@ -466,10 +491,12 @@ func (v *pinotVisibilityTripleManager) ListWorkflowExecutions( request *ListWorkflowExecutionsByQueryRequest, ) (*ListWorkflowExecutionsResponse, error) { v.logUserQueryParameters(userParameters{ - operation: string(Operation.LIST), - domainName: request.Domain, - closeStatus: 6, // 6 means not set closeStatus. - customQuery: request.Query, + operation: string(Operation.LIST), + domainName: request.Domain, + closeStatus: 6, // 6 means not set closeStatus. + customQuery: request.Query, + earliestTime: -1, + latestTime: -1, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.ListWorkflowExecutions(ctx, request) @@ -480,10 +507,12 @@ func (v *pinotVisibilityTripleManager) ScanWorkflowExecutions( request *ListWorkflowExecutionsByQueryRequest, ) (*ListWorkflowExecutionsResponse, error) { v.logUserQueryParameters(userParameters{ - operation: string(Operation.LIST), - domainName: request.Domain, - closeStatus: 6, // 6 means not set closeStatus. - customQuery: request.Query, + operation: string(Operation.LIST), + domainName: request.Domain, + closeStatus: 6, // 6 means not set closeStatus. + customQuery: request.Query, + earliestTime: -1, + latestTime: -1, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.ScanWorkflowExecutions(ctx, request) @@ -494,10 +523,12 @@ func (v *pinotVisibilityTripleManager) CountWorkflowExecutions( request *CountWorkflowExecutionsRequest, ) (*CountWorkflowExecutionsResponse, error) { v.logUserQueryParameters(userParameters{ - operation: string(Operation.COUNT), - domainName: request.Domain, - closeStatus: 6, // 6 means not set closeStatus. - customQuery: request.Query, + operation: string(Operation.COUNT), + domainName: request.Domain, + closeStatus: 6, // 6 means not set closeStatus. + customQuery: request.Query, + earliestTime: -1, + latestTime: -1, }, request.Domain) manager := v.chooseVisibilityManagerForRead(ctx, request.Domain) return manager.CountWorkflowExecutions(ctx, request)