Skip to content

Commit

Permalink
Add Metric Emitter, which emits a metric once a minute for true repli…
Browse files Browse the repository at this point in the history
…cation lag in nanoseconds. (cadence-workflow#4979)
  • Loading branch information
ZackLK authored Aug 31, 2022
1 parent 86f645d commit 5202eb3
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 2 deletions.
4 changes: 3 additions & 1 deletion common/log/panic.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func CapturePanic(errPanic interface{}, logger Logger, retError *error) {

logger.Error("Panic is captured", tag.SysStackTrace(st), tag.Error(err))

*retError = err
if retError != nil {
*retError = err
}
}
}
4 changes: 4 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,10 @@ func SourceCluster(sourceCluster string) Tag {
return newStringTag("xdc-source-cluster", sourceCluster)
}

func RemoteCluster(remoteCluster string) Tag {
return newStringTag("xdc-remote-cluster", remoteCluster)
}

// PrevActiveCluster returns tag for PrevActiveCluster
func PrevActiveCluster(prevActiveCluster string) Tag {
return newStringTag("xdc-prev-active-cluster", prevActiveCluster)
Expand Down
5 changes: 5 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,8 @@ const (
ReplicatorTaskSyncActivityScope
// ReplicateHistoryEventsScope is the scope used by historyReplicator API for applying events
ReplicateHistoryEventsScope
// ReplicationMetricEmitterScope is the scope used by all metrics emitted by replication metric emitter
ReplicationMetricEmitterScope
// ShardInfoScope is the scope used when updating shard info
ShardInfoScope
// WorkflowContextScope is the scope used by WorkflowContext component
Expand Down Expand Up @@ -1711,6 +1713,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
ReplicatorTaskHistoryScope: {operation: "ReplicatorTaskHistory"},
ReplicatorTaskSyncActivityScope: {operation: "ReplicatorTaskSyncActivity"},
ReplicateHistoryEventsScope: {operation: "ReplicateHistoryEvents"},
ReplicationMetricEmitterScope: {operation: "ReplicationMetricEmitter"},
ShardInfoScope: {operation: "ShardInfo"},
WorkflowContextScope: {operation: "WorkflowContext"},
HistoryCacheGetAndCreateScope: {operation: "HistoryCacheGetAndCreate", tags: map[string]string{CacheTypeTagName: MutableStateCacheTypeTagValue}},
Expand Down Expand Up @@ -2142,6 +2145,7 @@ const (
ArchiverClientVisibilityInlineArchiveThrottledCount
LastRetrievedMessageID
LastProcessedMessageID
ReplicationLatency
ReplicationTasksApplied
ReplicationTasksFailed
ReplicationTasksLag
Expand Down Expand Up @@ -2693,6 +2697,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ArchiverClientVisibilityInlineArchiveThrottledCount: {metricName: "archiver_client_visibility_inline_archive_throttled", metricType: Counter},
LastRetrievedMessageID: {metricName: "last_retrieved_message_id", metricType: Gauge},
LastProcessedMessageID: {metricName: "last_processed_message_id", metricType: Gauge},
ReplicationLatency: {metricName: "replication_latency", metricType: Gauge},
ReplicationTasksApplied: {metricName: "replication_tasks_applied", metricType: Counter},
ReplicationTasksFailed: {metricName: "replication_tasks_failed", metricType: Counter},
ReplicationTasksLag: {metricName: "replication_tasks_lag", metricType: Timer},
Expand Down
10 changes: 9 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type (
replicationAckManager replication.TaskAckManager
replicationTaskStore *replication.TaskStore
replicationHydrator replication.TaskHydrator
replicationMetricsEmitter *replication.MetricsEmitterImpl
publicClient workflowserviceclient.Interface
eventsReapplier ndc.EventsReapplier
matchingClient matching.Client
Expand Down Expand Up @@ -168,6 +169,8 @@ func NewEngineWithShardContext(
shard.GetLogger(),
replicationHydrator,
)
replicationReader := replication.NewDynamicTaskReader(shard.GetShardID(), executionManager, shard.GetTimeSource(), config)

historyEngImpl := &historyEngineImpl{
currentClusterName: currentClusterName,
shard: shard,
Expand Down Expand Up @@ -221,10 +224,12 @@ func NewEngineWithShardContext(
shard,
shard.GetMetricsClient(),
shard.GetLogger(),
replication.NewDynamicTaskReader(shard.GetShardID(), executionManager, shard.GetTimeSource(), config),
replicationReader,
replicationTaskStore,
),
replicationTaskStore: replicationTaskStore,
replicationMetricsEmitter: replication.NewMetricsEmitter(
shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()),
}
historyEngImpl.decisionHandler = decision.NewHandler(
shard,
Expand Down Expand Up @@ -360,6 +365,7 @@ func (e *historyEngineImpl) Start() {
e.timerProcessor.Start()
e.crossClusterProcessor.Start()
e.replicationDLQHandler.Start()
e.replicationMetricsEmitter.Start()

// failover callback will try to create a failover queue processor to scan all inflight tasks
// if domain needs to be failovered. However, in the multicursor queue logic, the scan range
Expand All @@ -376,6 +382,7 @@ func (e *historyEngineImpl) Start() {
if e.config.EnableGracefulFailover() {
e.failoverMarkerNotifier.Start()
}

}

// Stop the service.
Expand All @@ -387,6 +394,7 @@ func (e *historyEngineImpl) Stop() {
e.timerProcessor.Stop()
e.crossClusterProcessor.Stop()
e.replicationDLQHandler.Stop()
e.replicationMetricsEmitter.Stop()

e.crossClusterTaskProcessors.Stop()

Expand Down
170 changes: 170 additions & 0 deletions service/history/replication/metrics_emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package replication

import (
ctx "context"
"fmt"
"strconv"
"sync/atomic"
"time"

"github.com/uber/cadence/common/config"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
)

const (
metricsEmissionInterval = time.Minute
)

type (
// MetricsEmitterImpl is responsible for emitting source side replication metrics occasionally.
MetricsEmitterImpl struct {
shardID int
currentCluster string
remoteClusters map[string]config.ClusterInformation
shardData metricsEmitterShardData
reader taskReader
scope metrics.Scope
logger log.Logger
status int32
done chan struct{}
}

// metricsEmitterShardData is for testing.
metricsEmitterShardData interface {
GetLogger() log.Logger
GetClusterMetadata() cluster.Metadata
GetClusterReplicationLevel(cluster string) int64
GetTimeSource() clock.TimeSource
}
)

// NewMetricsEmitter creates a new metrics emitter, which starts a goroutine to emit replication metrics occasionally.
func NewMetricsEmitter(
shardID int,
shardData metricsEmitterShardData,
reader taskReader,
metricsClient metrics.Client,
) *MetricsEmitterImpl {
currentCluster := shardData.GetClusterMetadata().GetCurrentClusterName()
remoteClusters := shardData.GetClusterMetadata().GetRemoteClusterInfo()

scope := metricsClient.Scope(
metrics.ReplicationMetricEmitterScope,
metrics.ActiveClusterTag(currentCluster),
metrics.InstanceTag(strconv.Itoa(shardID)),
)
logger := shardData.GetLogger().WithTags(
tag.ClusterName(currentCluster),
tag.ShardID(shardID))

return &MetricsEmitterImpl{
shardID: shardID,
currentCluster: currentCluster,
remoteClusters: remoteClusters,
status: common.DaemonStatusInitialized,
shardData: shardData,
reader: reader,
scope: scope,
logger: logger,
done: make(chan struct{}),
}
}

func (m *MetricsEmitterImpl) Start() {
if !atomic.CompareAndSwapInt32(&m.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

go m.emitMetricsLoop()
m.logger.Info("ReplicationMetricsEmitter started.")
}

func (m *MetricsEmitterImpl) Stop() {
if !atomic.CompareAndSwapInt32(&m.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

m.logger.Info("ReplicationMetricsEmitter shutting down.")
close(m.done)
}

func (m *MetricsEmitterImpl) emitMetricsLoop() {
ticker := time.NewTicker(metricsEmissionInterval)
defer ticker.Stop()
defer func() { log.CapturePanic(recover(), m.logger, nil) }()

for {
select {
case <-m.done:
return
case <-ticker.C:
m.emitMetrics()
}
}
}

func (m *MetricsEmitterImpl) emitMetrics() {
for remoteClusterName := range m.remoteClusters {
logger := m.logger.WithTags(tag.RemoteCluster(remoteClusterName))
scope := m.scope.Tagged(metrics.TargetClusterTag(remoteClusterName))

replicationLatency, err := m.determineReplicationLatency(remoteClusterName)
if err != nil {
return
}

scope.UpdateGauge(metrics.ReplicationLatency, float64(replicationLatency.Nanoseconds()))
logger.Debug(fmt.Sprintf("ReplicationLatency metric emitted: %v", float64(replicationLatency.Nanoseconds())))
}
}

func (m *MetricsEmitterImpl) determineReplicationLatency(remoteClusterName string) (time.Duration, error) {
logger := m.logger.WithTags(tag.RemoteCluster(remoteClusterName))
lastReadTaskID := m.shardData.GetClusterReplicationLevel(remoteClusterName)

tasks, _, err := m.reader.Read(ctx.Background(), lastReadTaskID, lastReadTaskID+1)
if err != nil {
logger.Error(fmt.Sprintf(
"Error reading when determining replication latency, lastReadTaskID=%v", lastReadTaskID),
tag.Error(err))
return 0, err
}
logger.Debug("Number of tasks retrieved", tag.Number(int64(len(tasks))))

var replicationLatency time.Duration
if len(tasks) > 0 {
creationTime := time.Unix(0, tasks[0].CreationTime)
replicationLatency = m.shardData.GetTimeSource().Now().Sub(creationTime)
}

return replicationLatency, nil
}
119 changes: 119 additions & 0 deletions service/history/replication/metrics_emitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package replication

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
)

var (
cluster1 = "cluster1"
cluster2 = "cluster2"
cluster3 = "cluster3"
)

func TestMetricsEmitter(t *testing.T) {
timeSource := clock.NewEventTimeSource()
metadata := cluster.NewMetadata(0, cluster1, cluster1, map[string]config.ClusterInformation{
cluster1: {Enabled: true},
cluster2: {Enabled: true},
cluster3: {Enabled: true},
})
testShardData := newTestShardData(timeSource, metadata)
timeSource.Update(time.Unix(10000, 0))

task1 := persistence.ReplicationTaskInfo{TaskID: 1, CreationTime: timeSource.Now().Add(-time.Hour).UnixNano()}
task2 := persistence.ReplicationTaskInfo{TaskID: 2, CreationTime: timeSource.Now().Add(-time.Minute).UnixNano()}
reader := fakeTaskReader{&task1, &task2}

metricsEmitter := NewMetricsEmitter(1, testShardData, reader, metrics.NewNoopMetricsClient())
latency, err := metricsEmitter.determineReplicationLatency(cluster2)
assert.NoError(t, err)
assert.Equal(t, time.Hour, latency)

// Move replication level up for cluster2 and our latency shortens
testShardData.clusterReplicationLevel[cluster2] = 2
latency, err = metricsEmitter.determineReplicationLatency(cluster2)
assert.NoError(t, err)
assert.Equal(t, time.Minute, latency)

// Move replication level up for cluster2 and we no longer have latency
testShardData.clusterReplicationLevel[cluster2] = 3
latency, err = metricsEmitter.determineReplicationLatency(cluster2)
assert.NoError(t, err)
assert.Equal(t, time.Duration(0), latency)

// Cluster3 will still have latency
latency, err = metricsEmitter.determineReplicationLatency(cluster3)
assert.NoError(t, err)
assert.Equal(t, time.Hour, latency)
}

type testShardData struct {
shardID int
logger log.Logger
maxReadLevel int64
clusterReplicationLevel map[string]int64
timeSource clock.TimeSource
metadata cluster.Metadata
}

func newTestShardData(timeSource clock.TimeSource, metadata cluster.Metadata) testShardData {
remotes := metadata.GetRemoteClusterInfo()
clusterReplicationLevels := make(map[string]int64, len(remotes))
for remote := range remotes {
clusterReplicationLevels[remote] = 1
}
return testShardData{
logger: log.NewNoop(),
timeSource: timeSource,
metadata: metadata,
clusterReplicationLevel: clusterReplicationLevels,
}
}

func (t testShardData) GetLogger() log.Logger {
return t.logger
}

func (t testShardData) GetClusterReplicationLevel(cluster string) int64 {
return t.clusterReplicationLevel[cluster]
}

func (t testShardData) GetTimeSource() clock.TimeSource {
return t.timeSource
}

func (t testShardData) GetClusterMetadata() cluster.Metadata {
return t.metadata
}

0 comments on commit 5202eb3

Please sign in to comment.