Skip to content

Commit

Permalink
Add metrics for cross cluster implementation (cadence-workflow#4527)
Browse files Browse the repository at this point in the history
- Add metrics for cross cluster components
- Add missing parts in cross-cluster components
- Handle shard level error when getting cross cluster tasks
  • Loading branch information
yycptt authored Oct 6, 2021
1 parent c7e94f2 commit 70cf8be
Show file tree
Hide file tree
Showing 21 changed files with 381 additions and 178 deletions.
1 change: 1 addition & 0 deletions cmd/server/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:build tools
// +build tools

package tools
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ func ShardTimerAcks(shardTimerAcks interface{}) Tag {
return newObjectTag("shard-timer-acks", shardTimerAcks)
}

// ShardCrossClusterAcks returns tag for ShardCrossClusterAcks
func ShardCrossClusterAcks(shardCrossClusterAcks interface{}) Tag {
return newObjectTag("shard-cross-cluster-acks", shardCrossClusterAcks)
}

// task queue processor

// QueueLevel returns tag for QueueLevel
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ var (
ComponentFailoverMarkerNotifier = component("failover-marker-notifier")
ComponentCrossClusterQueueProcessor = component("cross-cluster-queue-processor")
ComponentCrossClusterTaskProcessor = component("cross-cluster-task-processor")
ComponentCrossClusterTaskFetcher = component("cross-cluster-task-fetcher")
ComponentShardScanner = component("shardscanner-scanner")
ComponentShardFixer = component("shardscanner-fixer")
)
Expand Down
295 changes: 169 additions & 126 deletions common/metrics/defs.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/persistence/statsComputer.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ func (sc *statsComputer) computeWorkflowSnapshotStats(req *InternalWorkflowSnaps
requestCancelInfoCount := len(req.RequestCancelInfos)

transferTasksCount := len(req.TransferTasks)
// TODO: cross-region task count
crossClusterTasksCount := len(req.CrossClusterTasks)
timerTasksCount := len(req.TimerTasks)
replicationTasksCount := len(req.ReplicationTasks)
Expand Down Expand Up @@ -287,6 +286,7 @@ func mergeMutableStateUpdateSessionStats(stats ...*MutableStateUpdateSessionStat
result.DeleteRequestCancelInfoCount += s.DeleteRequestCancelInfoCount

result.TransferTasksCount += s.TransferTasksCount
result.CrossClusterTaskCount += s.CrossClusterTaskCount
result.TimerInfoCount += s.TimerInfoCount
result.ReplicationTasksCount += s.ReplicationTasksCount
}
Expand Down
14 changes: 14 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,3 +982,17 @@ func ConvertErrToGetTaskFailedCause(err error) types.GetTaskFailedCause {
}
return types.GetTaskFailedCauseUncategorized
}

// ConvertGetTaskFailedCauseToErr converts GetTaskFailedCause to error
func ConvertGetTaskFailedCauseToErr(failedCause types.GetTaskFailedCause) error {
switch failedCause {
case types.GetTaskFailedCauseServiceBusy:
return &types.ServiceBusyError{}
case types.GetTaskFailedCauseTimeout:
return context.DeadlineExceeded
case types.GetTaskFailedCauseShardOwnershipLost:
return &types.ShardOwnershipLostError{}
default:
return &types.InternalServiceError{Message: "uncategorized error"}
}
}
1 change: 1 addition & 0 deletions host/elastic_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:build esintegration
// +build esintegration

// to run locally, make sure kafka and es is running,
Expand Down
4 changes: 2 additions & 2 deletions host/xdc/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// +build !race
// +build esintegration
//go:build !race && esintegration
// +build !race,esintegration

// to run locally, make sure kafka and es is running,
// then run cmd `go test -v ./host/xdc -run TestESCrossDCTestSuite -tags esintegration`
Expand Down
2 changes: 2 additions & 0 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:build !race
// +build !race

// need to run xdc tests with race detector off because of ringpop bug causing data race issue

package xdc
Expand Down
1 change: 1 addition & 0 deletions service/history/execution/context_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func emitSessionUpdateStats(
countScope.RecordTimer(metrics.DeleteRequestCancelInfoCount, time.Duration(stats.DeleteRequestCancelInfoCount))
countScope.RecordTimer(metrics.TransferTasksCount, time.Duration(stats.TransferTasksCount))
countScope.RecordTimer(metrics.TimerTasksCount, time.Duration(stats.TimerTasksCount))
countScope.RecordTimer(metrics.CrossClusterTasksCount, time.Duration(stats.CrossClusterTaskCount))
countScope.RecordTimer(metrics.ReplicationTasksCount, time.Duration(stats.ReplicationTasksCount))
}

Expand Down
4 changes: 3 additions & 1 deletion service/history/queue/cross_cluster_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ processorPumpLoop:
continue processorPumpLoop
}

if c.readyForPollTasks.Len() > c.options.MaxPendingTaskSize() {
numReadyForPoll := c.readyForPollTasks.Len()
c.metricsScope.RecordTimer(metrics.CrossClusterTaskPendingTimer, time.Duration(numReadyForPoll))
if numReadyForPoll > c.options.MaxPendingTaskSize() {
c.logger.Warn("too many outstanding ready for poll tasks in cross cluster queue.")
c.backoffAllQueueCollections()
continue processorPumpLoop
Expand Down
53 changes: 39 additions & 14 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,14 @@ var (
)

const (
conditionalRetryCount = 5
logWarnTransferLevelDiff = 3000000 // 3 million
logWarnTimerLevelDiff = time.Duration(30 * time.Minute)
historySizeLogThreshold = 10 * 1024 * 1024
minContextTimeout = 1 * time.Second
conditionalRetryCount = 5
// transfer/cross cluster diff/lag is in terms of taskID, which is calculated based on shard rangeID
// on shard movement, taskID will increase by around 1 million
logWarnTransferLevelDiff = 3000000 // 3 million
logWarnCrossClusterLevelLag = 3000000 // 3 million
logWarnTimerLevelDiff = time.Duration(30 * time.Minute)
historySizeLogThreshold = 10 * 1024 * 1024
minContextTimeout = 1 * time.Second
)

func (s *contextImpl) GetShardID() int {
Expand Down Expand Up @@ -1183,33 +1186,55 @@ func (s *contextImpl) emitShardInfoMetricsLogsLocked() {
transferLag := s.transferMaxReadLevel - s.shardInfo.TransferAckLevel
timerLag := time.Since(s.shardInfo.TimerAckLevel)

minCrossClusterLevel := int64(math.MaxInt64)
crossClusterLevelByCluster := make(map[string]int64)
for cluster, pqs := range s.shardInfo.CrossClusterProcessingQueueStates.StatesByCluster {
crossClusterLevel := int64(math.MaxInt64)
for _, queueState := range pqs {
crossClusterLevel = common.MinInt64(crossClusterLevel, queueState.GetAckLevel())
}
crossClusterLevelByCluster[cluster] = crossClusterLevel
minCrossClusterLevel = common.MinInt64(minCrossClusterLevel, crossClusterLevel)
}
crossClusterLag := s.transferMaxReadLevel - minCrossClusterLevel
if minCrossClusterLevel == 0 {
crossClusterLag = 0
}

transferFailoverInProgress := len(s.transferFailoverLevels)
timerFailoverInProgress := len(s.timerFailoverLevels)

if s.config.EmitShardDiffLog() &&
(logWarnTransferLevelDiff < diffTransferLevel ||
logWarnTimerLevelDiff < diffTimerLevel ||
logWarnTransferLevelDiff < transferLag ||
logWarnTimerLevelDiff < timerLag) {
logWarnTimerLevelDiff < timerLag ||
logWarnCrossClusterLevelLag < crossClusterLag) {

logger := s.logger.WithTags(
tag.ShardTime(s.remoteClusterCurrentTime),
tag.ShardReplicationAck(s.shardInfo.ReplicationAckLevel),
tag.ShardTimerAcks(s.shardInfo.ClusterTimerAckLevel),
tag.ShardTransferAcks(s.shardInfo.ClusterTransferAckLevel))
tag.ShardTransferAcks(s.shardInfo.ClusterTransferAckLevel),
tag.ShardCrossClusterAcks(crossClusterLevelByCluster),
)

logger.Warn("Shard ack levels diff exceeds warn threshold.")
}

s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTransferDiffTimer, time.Duration(diffTransferLevel))
s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTimerDiffTimer, diffTimerLevel)
metricsScope := s.GetMetricsClient().Scope(metrics.ShardInfoScope)
metricsScope.RecordTimer(metrics.ShardInfoTransferDiffTimer, time.Duration(diffTransferLevel))
metricsScope.RecordTimer(metrics.ShardInfoTimerDiffTimer, diffTimerLevel)

s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoReplicationLagTimer, time.Duration(replicationLag))
s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTransferLagTimer, time.Duration(transferLag))
s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTimerLagTimer, timerLag)
metricsScope.RecordTimer(metrics.ShardInfoReplicationLagTimer, time.Duration(replicationLag))
metricsScope.RecordTimer(metrics.ShardInfoTransferLagTimer, time.Duration(transferLag))
metricsScope.RecordTimer(metrics.ShardInfoTimerLagTimer, timerLag)
for cluster, crossClusterLag := range crossClusterLevelByCluster {
metricsScope.Tagged(metrics.TargetClusterTag(cluster)).RecordTimer(metrics.ShardInfoCrossClusterLagTimer, time.Duration(crossClusterLag))
}

s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTransferFailoverInProgressTimer, time.Duration(transferFailoverInProgress))
s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTimerFailoverInProgressTimer, time.Duration(timerFailoverInProgress))
metricsScope.RecordTimer(metrics.ShardInfoTransferFailoverInProgressTimer, time.Duration(transferFailoverInProgress))
metricsScope.RecordTimer(metrics.ShardInfoTimerFailoverInProgressTimer, time.Duration(timerFailoverInProgress))
}

func (s *contextImpl) allocateTaskIDsLocked(
Expand Down
16 changes: 16 additions & 0 deletions service/history/shard/contextTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/events"
"github.com/uber/cadence/service/history/resource"
Expand All @@ -51,6 +52,21 @@ func NewTestContext(
) *TestContext {
resource := resource.NewTest(ctrl, metrics.History)
eventsCache := events.NewMockCache(ctrl)
if shardInfo.TransferProcessingQueueStates == nil {
shardInfo.TransferProcessingQueueStates = &types.ProcessingQueueStates{
StatesByCluster: make(map[string][]*types.ProcessingQueueState),
}
}
if shardInfo.TimerProcessingQueueStates == nil {
shardInfo.TimerProcessingQueueStates = &types.ProcessingQueueStates{
StatesByCluster: make(map[string][]*types.ProcessingQueueState),
}
}
if shardInfo.CrossClusterProcessingQueueStates == nil {
shardInfo.CrossClusterProcessingQueueStates = &types.ProcessingQueueStates{
StatesByCluster: make(map[string][]*types.ProcessingQueueState),
}
}
shard := &contextImpl{
Resource: resource,
shardID: shardInfo.ShardID,
Expand Down
2 changes: 1 addition & 1 deletion service/history/task/cross_cluster_target_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (t *crossClusterTargetTaskExecutor) executeApplyParentClosePolicyTask(
return nil, err
}

scope := t.metricsClient.Scope(metrics.CrossClusterTaskTypeApplyParentClosePolicyScope)
scope := t.metricsClient.Scope(metrics.CrossClusterSourceTaskApplyParentClosePolicyScope)
err = applyParentClosePolicy(
ctx,
t.historyClient,
Expand Down
16 changes: 13 additions & 3 deletions service/history/task/cross_cluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func NewCrossClusterSourceTask(
processingStateInitialized,
taskExecutor,
taskProcessor,
getCrossClusterTaskMetricsScope(taskInfo.GetTaskType(), true),
logger,
shard.GetTimeSource(),
redispatchFn,
Expand Down Expand Up @@ -211,8 +212,14 @@ func NewCrossClusterTargetTask(
info.TargetRunID = taskRequest.SignalExecutionAttributes.TargetRunID
info.ScheduleID = taskRequest.SignalExecutionAttributes.InitiatedEventID
info.TargetChildWorkflowOnly = taskRequest.SignalExecutionAttributes.ChildWorkflowOnly
// TODO: implement recordChildWorkflowExeuctionComplete
// TODO: implement applyParentClosePolicyComplete
case types.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete:
info.TaskType = persistence.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete
info.TargetDomainID = taskRequest.RecordChildWorkflowExecutionCompleteAttributes.TargetDomainID
info.TargetWorkflowID = taskRequest.RecordChildWorkflowExecutionCompleteAttributes.TargetWorkflowID
info.TargetRunID = taskRequest.RecordChildWorkflowExecutionCompleteAttributes.TargetRunID
info.ScheduleID = taskRequest.RecordChildWorkflowExecutionCompleteAttributes.InitiatedEventID
case types.CrossClusterTaskTypeApplyParentPolicy:
info.TaskType = persistence.CrossClusterTaskTypeApplyParentPolicy
default:
panic(fmt.Sprintf("unknown cross cluster task type: %v", taskRequest.TaskInfo.GetTaskType()))
}
Expand All @@ -225,6 +232,7 @@ func NewCrossClusterTargetTask(
processingState(taskRequest.TaskInfo.TaskState),
taskExecutor,
taskProcessor,
getCrossClusterTaskMetricsScope(info.GetTaskType(), false),
logger,
shard.GetTimeSource(),
redispatchFn,
Expand All @@ -241,6 +249,7 @@ func newCrossClusterTaskBase(
processingState processingState,
taskExecutor Executor,
taskProcessor Processor,
metricScopeIdx int,
logger log.Logger,
timeSource clock.TimeSource,
redispatchFn func(task Task),
Expand All @@ -266,7 +275,8 @@ func newCrossClusterTaskBase(
eventLogger: eventLogger,
scope: getOrCreateDomainTaggedScope(
shard,
GetCrossClusterTaskMetricsScope(taskInfo.GetTaskType()), taskInfo.GetDomainID(),
metricScopeIdx,
taskInfo.GetDomainID(),
logger,
),
taskExecutor: taskExecutor,
Expand Down
26 changes: 23 additions & 3 deletions service/history/task/cross_cluster_task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func newCrossClusterTaskProcessor(
tag.ComponentCrossClusterTaskProcessor,
tag.SourceCluster(taskFetcher.GetSourceCluster()),
)
metricsScope := shard.GetMetricsClient().Scope(metrics.CrossClusterTaskProcessorScope)
metricsScope := shard.GetMetricsClient().Scope(
metrics.CrossClusterTaskProcessorScope,
metrics.ActiveClusterTag(taskFetcher.GetSourceCluster()),
)
retryPolicy := backoff.NewExponentialRetryPolicy(time.Millisecond * 100)
retryPolicy.SetMaximumInterval(time.Second)
retryPolicy.SetExpirationInterval(options.TaskWaitInterval())
Expand Down Expand Up @@ -198,16 +201,22 @@ func (p *crossClusterTaskProcessor) processLoop() {
}

// this will submit the fetching request to the host level task fetcher for batching
fetchFuture := p.taskFetcher.Fetch(p.shard.GetShardID())
p.metricsScope.IncCounter(metrics.CrossClusterFetchRequests)
sw := p.metricsScope.StartTimer(metrics.CrossClusterFetchLatency)

var taskRequests []*types.CrossClusterTaskRequest
if err := fetchFuture.Get(context.Background(), &taskRequests); err != nil {
err := p.taskFetcher.Fetch(p.shard.GetShardID()).Get(context.Background(), &taskRequests)
sw.Stop()
if err != nil {
p.logger.Error("Unable to fetch cross cluster tasks", tag.Error(err))
if common.IsServiceBusyError(err) {
p.metricsScope.IncCounter(metrics.CrossClusterFetchServiceBusyFailures)
time.Sleep(backoff.JitDuration(
p.options.ServiceBusyBackoffInterval(),
p.options.TimerJitterCoefficient(),
))
} else {
p.metricsScope.IncCounter(metrics.CrossClusterFetchFailures)
}
continue
}
Expand Down Expand Up @@ -329,6 +338,7 @@ func (p *crossClusterTaskProcessor) respondPendingTaskLoop() {
p.options.TimerJitterCoefficient(),
))
p.taskLock.Lock()
p.metricsScope.RecordTimer(metrics.CrossClusterTaskPendingTimer, time.Duration(len(p.pendingTasks)))
respondRequest := &types.RespondCrossClusterTasksCompletedRequest{
ShardID: int32(p.shard.GetShardID()),
TargetCluster: p.shard.GetClusterMetadata().GetCurrentClusterName(),
Expand Down Expand Up @@ -378,6 +388,8 @@ func (p *crossClusterTaskProcessor) respondPendingTaskLoop() {
func (p *crossClusterTaskProcessor) dedupTaskRequests(
taskRequests []*types.CrossClusterTaskRequest,
) []*types.CrossClusterTaskRequest {
p.metricsScope.RecordTimer(metrics.CrossClusterTaskFetchedTimer, time.Duration(len(taskRequests)))

// NOTE: this is only best effort dedup for reducing the number unnecessary task executions.
// it's possible that a task is removed from the pendingTasks maps before this dedup logic
// is executed for that task. In that case, that task will be executed multiple times. This
Expand All @@ -401,6 +413,10 @@ func (p *crossClusterTaskProcessor) dedupTaskRequests(
func (p *crossClusterTaskProcessor) respondTaskCompletedWithRetry(
request *types.RespondCrossClusterTasksCompletedRequest,
) (*types.RespondCrossClusterTasksCompletedResponse, error) {
p.metricsScope.IncCounter(metrics.CrossClusterTaskRespondRequests)
sw := p.metricsScope.StartTimer(metrics.CrossClusterTaskRespondLatency)
defer sw.Stop()

var response *types.RespondCrossClusterTasksCompletedResponse
err := backoff.Retry(
func() error {
Expand All @@ -409,6 +425,10 @@ func (p *crossClusterTaskProcessor) respondTaskCompletedWithRetry(

var err error
response, err = p.shard.GetService().GetHistoryRawClient().RespondCrossClusterTasksCompleted(ctx, request)
if err != nil {
p.logger.Error("Failed to respond cross cluster tasks completed", tag.Error(err))
p.metricsScope.IncCounter(metrics.CrossClusterTaskRespondFailures)
}
return err
},
p.retryPolicy,
Expand Down
Loading

0 comments on commit 70cf8be

Please sign in to comment.