Skip to content

Commit

Permalink
Task latency metrics for unsync match (cadence-workflow#1904)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored May 24, 2019
1 parent de29e4a commit 47af4aa
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .gen/go/sqlblobs/idl.go

Large diffs are not rendered by default.

56 changes: 50 additions & 6 deletions .gen/go/sqlblobs/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,7 @@ const (
SyncThrottleCounter
BufferThrottleCounter
SyncMatchLatency
AsyncMatchLatency
ExpiredTasksCounter

NumMatchingMetrics
Expand Down Expand Up @@ -1552,6 +1553,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
BufferThrottleCounter: {metricName: "buffer_throttle_count"},
ExpiredTasksCounter: {metricName: "tasks_expired"},
SyncMatchLatency: {metricName: "syncmatch_latency", metricType: Timer},
AsyncMatchLatency: {metricName: "asyncmatch_latency", metricType: Timer},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
Expand Down
10 changes: 8 additions & 2 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ const (
`domain_id: ?, ` +
`workflow_id: ?, ` +
`run_id: ?, ` +
`schedule_id: ?` +
`schedule_id: ?,` +
`created_time: ? ` +
`}`

templateCreateShardQuery = `INSERT INTO executions (` +
Expand Down Expand Up @@ -2782,6 +2783,7 @@ func (d *cassandraPersistence) CreateTasks(request *p.CreateTasksRequest) (*p.Cr
taskListType := request.TaskListInfo.TaskType
taskListKind := request.TaskListInfo.Kind
ackLevel := request.TaskListInfo.AckLevel
cqlNowTimestamp := p.UnixNanoToDBTimestamp(time.Now().UnixNano())

for _, task := range request.Tasks {
scheduleID := task.Data.ScheduleID
Expand All @@ -2795,7 +2797,8 @@ func (d *cassandraPersistence) CreateTasks(request *p.CreateTasksRequest) (*p.Cr
domainID,
task.Execution.GetWorkflowId(),
task.Execution.GetRunId(),
scheduleID)
scheduleID,
cqlNowTimestamp)
} else {
batch.Query(templateCreateTaskWithTTLQuery,
domainID,
Expand All @@ -2807,6 +2810,7 @@ func (d *cassandraPersistence) CreateTasks(request *p.CreateTasksRequest) (*p.Cr
task.Execution.GetWorkflowId(),
task.Execution.GetRunId(),
scheduleID,
cqlNowTimestamp,
task.Data.ScheduleToStartTimeout)
}
}
Expand Down Expand Up @@ -4313,6 +4317,8 @@ func createTaskInfo(result map[string]interface{}) *p.TaskInfo {
info.RunID = v.(gocql.UUID).String()
case "schedule_id":
info.ScheduleID = v.(int64)
case "created_time":
info.CreatedTime = v.(time.Time)
}
}

Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ type (
ScheduleID int64
ScheduleToStartTimeout int32
Expiry time.Time
CreatedTime time.Time
}

// Task is the generic interface for workflow tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (s *MatchingPersistenceSuite) TestCreateTask() {
s.Equal(*workflowExecution.WorkflowId, resp.Tasks[0].WorkflowID)
s.Equal(*workflowExecution.RunId, resp.Tasks[0].RunID)
s.Equal(sid, resp.Tasks[0].ScheduleID)
s.True(resp.Tasks[0].CreatedTime.UnixNano() > 0)
if s.TaskMgr.GetName() != "cassandra" {
// cassandra uses TTL and expiry isn't stored as part of task state
s.True(time.Now().Before(resp.Tasks[0].Expiry))
Expand Down
22 changes: 12 additions & 10 deletions common/persistence/sql/sqlTaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,11 @@ func (m *sqlTaskManager) CreateTasks(request *persistence.CreateTasksRequest) (*
expiryTime = time.Now().Add(time.Second * time.Duration(v.Data.ScheduleToStartTimeout))
}
blob, err := taskInfoToBlob(&sqlblobs.TaskInfo{
WorkflowID: &v.Data.WorkflowID,
RunID: sqldb.MustParseUUID(v.Data.RunID),
ScheduleID: &v.Data.ScheduleID,
ExpiryTimeNanos: common.Int64Ptr(expiryTime.UnixNano()),
WorkflowID: &v.Data.WorkflowID,
RunID: sqldb.MustParseUUID(v.Data.RunID),
ScheduleID: &v.Data.ScheduleID,
ExpiryTimeNanos: common.Int64Ptr(expiryTime.UnixNano()),
CreatedTimeNanos: common.Int64Ptr(time.Now().UnixNano()),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -390,12 +391,13 @@ func (m *sqlTaskManager) GetTasks(request *persistence.GetTasksRequest) (*persis
return nil, err
}
tasks[i] = &persistence.TaskInfo{
DomainID: request.DomainID,
WorkflowID: info.GetWorkflowID(),
RunID: sqldb.UUID(info.RunID).String(),
TaskID: v.TaskID,
ScheduleID: info.GetScheduleID(),
Expiry: time.Unix(0, info.GetExpiryTimeNanos()),
DomainID: request.DomainID,
WorkflowID: info.GetWorkflowID(),
RunID: sqldb.UUID(info.RunID).String(),
TaskID: v.TaskID,
ScheduleID: info.GetScheduleID(),
Expiry: time.Unix(0, info.GetExpiryTimeNanos()),
CreatedTime: time.Unix(0, info.GetCreatedTimeNanos()),
}
}

Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/sqlblobs.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ struct TaskInfo {
12: optional binary runID
13: optional i64 (js.type = "Long") scheduleID
14: optional i64 (js.type = "Long") expiryTimeNanos
15: optional i64 (js.type = "Long") createdTimeNanos
}

struct TaskListInfo {
Expand Down
1 change: 1 addition & 0 deletions schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ CREATE TYPE task (
workflow_id text,
run_id uuid,
schedule_id bigint,
created_time timestamp
);

CREATE TYPE task_list (
Expand Down
3 changes: 2 additions & 1 deletion schema/cassandra/cadence/versioned/v0.17/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"MinCompatibleVersion": "0.17",
"Description": "Added search attributes to execution",
"SchemaUpdateCqlFiles": [
"search_attr.cql"
"search_attr.cql",
"task_created_time.cql"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE task ADD created_time timestamp;
14 changes: 13 additions & 1 deletion service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"math"
"sync"
"time"

"github.com/pborman/uuid"
h "github.com/uber/cadence/.gen/go/history"
Expand Down Expand Up @@ -232,6 +233,7 @@ func (e *matchingEngineImpl) AddDecisionTask(addRequest *m.AddDecisionTaskReques
WorkflowID: addRequest.Execution.GetWorkflowId(),
ScheduleID: addRequest.GetScheduleId(),
ScheduleToStartTimeout: addRequest.GetScheduleToStartTimeoutSeconds(),
CreatedTime: time.Now(),
}
return tlMgr.AddTask(addRequest.Execution, taskInfo)
}
Expand All @@ -255,6 +257,7 @@ func (e *matchingEngineImpl) AddActivityTask(addRequest *m.AddActivityTaskReques
WorkflowID: addRequest.Execution.GetWorkflowId(),
ScheduleID: addRequest.GetScheduleId(),
ScheduleToStartTimeout: addRequest.GetScheduleToStartTimeoutSeconds(),
CreatedTime: time.Now(),
}
return tlMgr.AddTask(addRequest.Execution, taskInfo)
}
Expand Down Expand Up @@ -618,14 +621,18 @@ func (e *matchingEngineImpl) createPollForDecisionTaskResponse(context *taskCont
ScheduleAttempt: historyResponse.GetAttempt(),
}
token, _ = e.tokenSerializer.Serialize(taskoken)
if context.syncResponseCh == nil {
scope := domainTaggedMetricScope(e.domainCache, task.DomainID, e.metricsClient, metrics.MatchingPollForDecisionTaskScope)
scope.RecordTimer(metrics.AsyncMatchLatency, time.Since(task.CreatedTime))
scope.Tagged(metrics.DomainAllTag()).RecordTimer(metrics.AsyncMatchLatency, time.Since(task.CreatedTime))
}
}

response := common.CreateMatchingPollForDecisionTaskResponse(historyResponse, workflowExecutionPtr(context.workflowExecution), token)
if context.queryTaskInfo != nil {
response.Query = context.queryTaskInfo.queryRequest.QueryRequest.Query
}
response.BacklogCountHint = common.Int64Ptr(context.backlogCountHint)

return response
}

Expand All @@ -642,6 +649,11 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(context *taskCont
if attributes.ActivityId == nil {
panic("ActivityTaskScheduledEventAttributes.ActivityID is not set")
}
if context.syncResponseCh == nil {
scope := domainTaggedMetricScope(e.domainCache, task.DomainID, e.metricsClient, metrics.MatchingPollForActivityTaskScope)
scope.RecordTimer(metrics.AsyncMatchLatency, time.Since(task.CreatedTime))
scope.Tagged(metrics.DomainAllTag()).RecordTimer(metrics.AsyncMatchLatency, time.Since(task.CreatedTime))
}

response := &workflow.PollForActivityTaskResponse{}
response.ActivityId = attributes.ActivityId
Expand Down

0 comments on commit 47af4aa

Please sign in to comment.