Skip to content

Commit

Permalink
Improve Archival logic in TimerQueueProcessor (cadence-workflow#2323)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jul 31, 2019
1 parent 209e527 commit b9271fb
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 181 deletions.
3 changes: 3 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,9 @@ func ArchivalCallerServiceName(callerServiceName string) Tag {
return newStringTag("archival-caller-service-name", callerServiceName)
}

// ArchivalArchiveInline returns tag for whether archival is done inline or a signal is sent
func ArchivalArchiveInline(archiveInline bool) Tag {
return newBoolTag("archival-archive-inline", archiveInline)
// ArchivalArchiveAttemptedInline returns tag for whether archival is attempted inline before signal is sent.
func ArchivalArchiveAttemptedInline(archiveInline bool) Tag {
return newBoolTag("archival-archive-attempted-inline", archiveInline)
}

// ArchivalRequestDomainID returns tag for RequestDomainID
Expand Down
100 changes: 48 additions & 52 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,8 +1461,6 @@ const (
ArchiverDeleteWithRetriesLatency
ArchiverUploadFailedAllRetriesCount
ArchiverUploadSuccessCount
ArchiverDeleteLocalFailedAllRetriesCount
ArchiverDeleteLocalSuccessCount
ArchiverDeleteFailedAllRetriesCount
ArchiverDeleteSuccessCount
ArchiverBacklogSizeGauge
Expand Down Expand Up @@ -1707,56 +1705,54 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
RemoteToRemoteMatchCounter: {metricName: "remote_to_remote_matches"},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
ReplicatorFailures: {metricName: "replicator_errors"},
ReplicatorMessagesDropped: {metricName: "replicator_messages_dropped"},
ReplicatorLatency: {metricName: "replicator_latency"},
ESProcessorRequests: {metricName: "es_processor_requests"},
ESProcessorRetries: {metricName: "es_processor_retries"},
ESProcessorFailures: {metricName: "es_processor_errors"},
ESProcessorCorruptedData: {metricName: "es_processor_corrupted_data"},
ESProcessorProcessMsgLatency: {metricName: "es_processor_process_msg_latency", metricType: Timer},
IndexProcessorCorruptedData: {metricName: "index_processor_corrupted_data"},
IndexProcessorProcessMsgLatency: {metricName: "index_processor_process_msg_latency", metricType: Timer},
ArchiverNonRetryableErrorCount: {metricName: "archiver_non_retryable_error"},
ArchiverStartedCount: {metricName: "archiver_started"},
ArchiverStoppedCount: {metricName: "archiver_stopped"},
ArchiverCoroutineStartedCount: {metricName: "archiver_coroutine_started"},
ArchiverCoroutineStoppedCount: {metricName: "archiver_coroutine_stopped"},
ArchiverHandleRequestLatency: {metricName: "archiver_handle_request_latency"},
ArchiverUploadWithRetriesLatency: {metricName: "archiver_upload_with_retries_latency"},
ArchiverDeleteWithRetriesLatency: {metricName: "archiver_delete_with_retries_latency"},
ArchiverUploadFailedAllRetriesCount: {metricName: "archiver_upload_failed_all_retries"},
ArchiverUploadSuccessCount: {metricName: "archiver_upload_success"},
ArchiverDeleteLocalFailedAllRetriesCount: {metricName: "archiver_delete_local_failed_all_retries"},
ArchiverDeleteLocalSuccessCount: {metricName: "archiver_delete_local_success"},
ArchiverDeleteFailedAllRetriesCount: {metricName: "archiver_delete_failed_all_retries"},
ArchiverDeleteSuccessCount: {metricName: "archiver_delete_success"},
ArchiverBacklogSizeGauge: {metricName: "archiver_backlog_size"},
ArchiverPumpTimeoutCount: {metricName: "archiver_pump_timeout"},
ArchiverPumpSignalThresholdCount: {metricName: "archiver_pump_signal_threshold"},
ArchiverPumpTimeoutWithoutSignalsCount: {metricName: "archiver_pump_timeout_without_signals"},
ArchiverPumpSignalChannelClosedCount: {metricName: "archiver_pump_signal_channel_closed"},
ArchiverWorkflowStartedCount: {metricName: "archiver_workflow_started"},
ArchiverNumPumpedRequestsCount: {metricName: "archiver_num_pumped_requests"},
ArchiverNumHandledRequestsCount: {metricName: "archiver_num_handled_requests"},
ArchiverPumpedNotEqualHandledCount: {metricName: "archiver_pumped_not_equal_handled"},
ArchiverHandleAllRequestsLatency: {metricName: "archiver_handle_all_requests_latency"},
ArchiverWorkflowStoppingCount: {metricName: "archiver_workflow_stopping"},
ArchiverClientSendSignalFailureCount: {metricName: "archiver_client_send_signal_error"},
ArchiverClientInlineArchiveAttemptCount: {metricName: "archiver_client_inline_archive_attempt"},
ArchiverClientInlineArchiveFailureCount: {metricName: "archiver_client_inline_archive_failure"},
TaskProcessedCount: {metricName: "task_processed", metricType: Gauge},
TaskDeletedCount: {metricName: "task_deleted", metricType: Gauge},
TaskListProcessedCount: {metricName: "tasklist_processed", metricType: Gauge},
TaskListDeletedCount: {metricName: "tasklist_deleted", metricType: Gauge},
TaskListOutstandingCount: {metricName: "tasklist_outstanding", metricType: Gauge},
StartedCount: {metricName: "started", metricType: Counter},
StoppedCount: {metricName: "stopped", metricType: Counter},
ExecutorTasksDeferredCount: {metricName: "executor_deferred", metricType: Counter},
ExecutorTasksDroppedCount: {metricName: "executor_dropped", metricType: Counter},
BatcherProcessorSuccess: {metricName: "batcher_processor_requests", metricType: Counter},
BatcherProcessorFailures: {metricName: "batcher_processor_errors", metricType: Counter},
ReplicatorMessages: {metricName: "replicator_messages"},
ReplicatorFailures: {metricName: "replicator_errors"},
ReplicatorMessagesDropped: {metricName: "replicator_messages_dropped"},
ReplicatorLatency: {metricName: "replicator_latency"},
ESProcessorRequests: {metricName: "es_processor_requests"},
ESProcessorRetries: {metricName: "es_processor_retries"},
ESProcessorFailures: {metricName: "es_processor_errors"},
ESProcessorCorruptedData: {metricName: "es_processor_corrupted_data"},
ESProcessorProcessMsgLatency: {metricName: "es_processor_process_msg_latency", metricType: Timer},
IndexProcessorCorruptedData: {metricName: "index_processor_corrupted_data"},
IndexProcessorProcessMsgLatency: {metricName: "index_processor_process_msg_latency", metricType: Timer},
ArchiverNonRetryableErrorCount: {metricName: "archiver_non_retryable_error"},
ArchiverStartedCount: {metricName: "archiver_started"},
ArchiverStoppedCount: {metricName: "archiver_stopped"},
ArchiverCoroutineStartedCount: {metricName: "archiver_coroutine_started"},
ArchiverCoroutineStoppedCount: {metricName: "archiver_coroutine_stopped"},
ArchiverHandleRequestLatency: {metricName: "archiver_handle_request_latency"},
ArchiverUploadWithRetriesLatency: {metricName: "archiver_upload_with_retries_latency"},
ArchiverDeleteWithRetriesLatency: {metricName: "archiver_delete_with_retries_latency"},
ArchiverUploadFailedAllRetriesCount: {metricName: "archiver_upload_failed_all_retries"},
ArchiverUploadSuccessCount: {metricName: "archiver_upload_success"},
ArchiverDeleteFailedAllRetriesCount: {metricName: "archiver_delete_failed_all_retries"},
ArchiverDeleteSuccessCount: {metricName: "archiver_delete_success"},
ArchiverBacklogSizeGauge: {metricName: "archiver_backlog_size"},
ArchiverPumpTimeoutCount: {metricName: "archiver_pump_timeout"},
ArchiverPumpSignalThresholdCount: {metricName: "archiver_pump_signal_threshold"},
ArchiverPumpTimeoutWithoutSignalsCount: {metricName: "archiver_pump_timeout_without_signals"},
ArchiverPumpSignalChannelClosedCount: {metricName: "archiver_pump_signal_channel_closed"},
ArchiverWorkflowStartedCount: {metricName: "archiver_workflow_started"},
ArchiverNumPumpedRequestsCount: {metricName: "archiver_num_pumped_requests"},
ArchiverNumHandledRequestsCount: {metricName: "archiver_num_handled_requests"},
ArchiverPumpedNotEqualHandledCount: {metricName: "archiver_pumped_not_equal_handled"},
ArchiverHandleAllRequestsLatency: {metricName: "archiver_handle_all_requests_latency"},
ArchiverWorkflowStoppingCount: {metricName: "archiver_workflow_stopping"},
ArchiverClientSendSignalFailureCount: {metricName: "archiver_client_send_signal_error"},
ArchiverClientInlineArchiveAttemptCount: {metricName: "archiver_client_inline_archive_attempt"},
ArchiverClientInlineArchiveFailureCount: {metricName: "archiver_client_inline_archive_failure"},
TaskProcessedCount: {metricName: "task_processed", metricType: Gauge},
TaskDeletedCount: {metricName: "task_deleted", metricType: Gauge},
TaskListProcessedCount: {metricName: "tasklist_processed", metricType: Gauge},
TaskListDeletedCount: {metricName: "tasklist_deleted", metricType: Gauge},
TaskListOutstandingCount: {metricName: "tasklist_outstanding", metricType: Gauge},
StartedCount: {metricName: "started", metricType: Counter},
StoppedCount: {metricName: "stopped", metricType: Counter},
ExecutorTasksDeferredCount: {metricName: "executor_deferred", metricType: Counter},
ExecutorTasksDroppedCount: {metricName: "executor_dropped", metricType: Counter},
BatcherProcessorSuccess: {metricName: "batcher_processor_requests", metricType: Counter},
BatcherProcessorFailures: {metricName: "batcher_processor_errors", metricType: Counter},
},
}

Expand Down
3 changes: 1 addition & 2 deletions service/history/MockWorkflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
"context"
"time"

"github.com/uber/cadence/common/log"

"github.com/stretchr/testify/mock"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
)

Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func NewEngineWithShardContext(
config: config,
archivalClient: warchiver.NewClient(
shard.GetMetricsClient(),
shard.GetLogger(),
logger,
publicClient,
shard.GetConfig().NumArchiveSystemWorkflows,
shard.GetConfig().ArchiveRequestRPS,
Expand Down
22 changes: 9 additions & 13 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,11 @@ func (t *timerQueueProcessorBase) processDeleteHistoryEvent(
return nil
}

clusterArchivalStatus := t.shard.GetService().GetArchivalMetadata().GetHistoryConfig().GetClusterStatus()
domainCacheEntry, err := t.historyService.shard.GetDomainCache().GetDomainByID(task.DomainID)
if err != nil {
return err
}
clusterArchivalStatus := t.shard.GetService().GetArchivalMetadata().GetHistoryConfig().GetClusterStatus()
domainArchivalStatus := domainCacheEntry.GetConfig().HistoryArchivalStatus
switch clusterArchivalStatus {
case carchiver.ArchivalDisabled:
Expand All @@ -644,7 +644,7 @@ func (t *timerQueueProcessorBase) processDeleteHistoryEvent(
return t.deleteWorkflow(task, context, msBuilder)
}
t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupArchiveCount)
return t.archiveWorkflow(task, context, msBuilder)
return t.archiveWorkflow(task, context, msBuilder, domainCacheEntry)
}
return nil
}
Expand Down Expand Up @@ -680,18 +680,13 @@ func (t *timerQueueProcessorBase) archiveWorkflow(
task *persistence.TimerTaskInfo,
workflowContext workflowExecutionContext,
msBuilder mutableState,
domainCacheEntry *cache.DomainCacheEntry,
) error {

domainCacheEntry, err := t.historyService.shard.GetDomainCache().GetDomainByID(task.DomainID)
if err != nil {
return err
}

executionStats, err := workflowContext.loadExecutionStats()
if err != nil {
return err
}
archiveInline := executionStats.HistorySize < int64(t.config.TimerProcessorHistoryArchivalSizeLimit())
attemptArchiveInline := executionStats.HistorySize < int64(t.config.TimerProcessorHistoryArchivalSizeLimit())
ctx, cancel := context.WithTimeout(context.Background(), t.config.TimerProcessorHistoryArchivalTimeLimit())
defer cancel()
req := &archiver.ClientRequest{
Expand All @@ -707,10 +702,11 @@ func (t *timerQueueProcessorBase) archiveWorkflow(
CloseFailoverVersion: msBuilder.GetLastWriteVersion(),
URI: domainCacheEntry.GetConfig().HistoryArchivalURI,
},
CallerService: common.HistoryServiceName,
ArchiveInline: archiveInline,
CallerService: common.HistoryServiceName,
AttemptArchiveInline: attemptArchiveInline,
}
if err := t.historyService.archivalClient.Archive(ctx, req); err != nil {
resp, err := t.historyService.archivalClient.Archive(ctx, req)
if err != nil {
return err
}

Expand All @@ -720,7 +716,7 @@ func (t *timerQueueProcessorBase) archiveWorkflow(
if err := t.deleteWorkflowExecution(task); err != nil {
return err
}
if archiveInline {
if resp.ArchivedInline {
if err := t.deleteWorkflowHistory(task, msBuilder); err != nil {
return err
}
Expand Down
Loading

0 comments on commit b9271fb

Please sign in to comment.