Skip to content

Commit

Permalink
Forward activity responses and heartbeats on failover as well (cadenc…
Browse files Browse the repository at this point in the history
…e-workflow#4823)

We have a user desiring this, and in general it seems like a good idea.
Activities are generally assumed to be "high cost" to lose, or at least potentially.

Longer term, we should probably consider making this a per-domain config,
rather than something that is hardcoded for a whole cluster.  Nothing
about this seems like it would be cluster-bound.
  • Loading branch information
Groxx authored May 19, 2022
1 parent fbfafb9 commit 20329a2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 12 deletions.
11 changes: 7 additions & 4 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,15 @@ type (
// 6. QueryWorkflow
// 7. ResetWorkflow
//
// Both "selected-apis-forwarding" and "all-domain-apis-forwarding" can work with EnableDomainNotActiveAutoForwarding dynamicconfig to select certain domains using the policy.
// 4) "selected-apis-forwarding-v2" will forward all of "selected-apis-forwarding", and also activity responses
// and heartbeats, but not other worker APIs.
//
// Usage recommendation: when enabling XDC(global domain) feature, either "all-domain-apis-forwarding" or "selected-apis-forwarding" should be used to ensure seamless domain failover(high availability)
// Depending on the cost of cross cluster calls :
// "selected-apis-forwarding(-v2)" and "all-domain-apis-forwarding" can work with EnableDomainNotActiveAutoForwarding dynamicconfig to select certain domains using the policy.
//
// 1) If the network communication overhead is high(e.g., clusters are in remote datacenters of different region), then should use "selected-apis-forwarding".
// Usage recommendation: when enabling XDC(global domain) feature, either "all-domain-apis-forwarding" or "selected-apis-forwarding(-v2)" should be used to ensure seamless domain failover(high availability)
// Depending on the cost of cross cluster calls:
//
// 1) If the network communication overhead is high(e.g., clusters are in remote datacenters of different region), then should use "selected-apis-forwarding(-v2)".
// But you must ensure a different set of workers with the same workflow & activity code are connected to each Cadence cluster.
//
// 2) If the network communication overhead is low (e.g. in the same datacenter, mostly for cluster migration usage), then you can use "all-domain-apis-forwarding". Then only one set of
Expand Down
8 changes: 8 additions & 0 deletions service/frontend/clusterRedirectionHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ type (
}
)

func TestForwardingPolicyV2ContainsV1(t *testing.T) {
require.NotEqual(t, selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2, selectedAPIsForwardingRedirectionPolicyAPIAllowlist)
for k := range selectedAPIsForwardingRedirectionPolicyAPIAllowlist {
_, ok := selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2[k]
require.True(t, ok, "v2 does not contain a key that is in v1: %v", k)
}
}

func TestClusterRedirectionHandlerSuite(t *testing.T) {
s := new(clusterRedirectionHandlerSuite)
suite.Run(t, s)
Expand Down
72 changes: 64 additions & 8 deletions service/frontend/clusterRedirectionPolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,34 @@ const (
// 5. TerminateWorkflowExecution
// 6. QueryWorkflowStrongConsistency
// 7. ResetWorkflow
// please also reference selectedAPIsForwardingRedirectionPolicyAPIAllowlist
// please also reference selectedAPIsForwardingRedirectionPolicyAPIAllowlist and DCRedirectionPolicySelectedAPIsForwardingV2
DCRedirectionPolicySelectedAPIsForwarding = "selected-apis-forwarding"
// DCRedirectionPolicyAllDomainAPIsForwarding means forwarding all the worker and non-worker APIs based domain
// DCRedirectionPolicySelectedAPIsForwardingV2 forwards everything in DCRedirectionPolicySelectedAPIsForwarding,
// as well as activity completions (sync and async) and heartbeats.
// This is done because activity results are generally deemed "useful" and relatively costly to re-do (when it is
// even possible to redo), but activity workers themselves may be datacenter-specific.
//
// This will likely replace DCRedirectionPolicySelectedAPIsForwarding soon.
//
// 1-7. from DCRedirectionPolicySelectedAPIsForwarding
// 8. RecordActivityTaskHeartbeat
// 9. RecordActivityTaskHeartbeatByID
// 10. RespondActivityTaskCanceled
// 11. RespondActivityTaskCanceledByID
// 12. RespondActivityTaskCompleted
// 13. RespondActivityTaskCompletedByID
// 14. RespondActivityTaskFailed
// 15. RespondActivityTaskFailedByID
// please also reference selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2
DCRedirectionPolicySelectedAPIsForwardingV2 = "selected-apis-forwarding-v2"
// DCRedirectionPolicyAllDomainAPIsForwarding means forwarding all the worker and non-worker APIs based domain,
// and falling back to DCRedirectionPolicySelectedAPIsForwarding when the current active cluster is not the
// cluster migration target.
DCRedirectionPolicyAllDomainAPIsForwarding = "all-domain-apis-forwarding"
// DCRedirectionPolicyAllDomainAPIsForwardingV2 means forwarding all the worker and non-worker APIs based domain,
// and falling back to DCRedirectionPolicySelectedAPIsForwardingV2 when the current active cluster is not the
// cluster migration target.
DCRedirectionPolicyAllDomainAPIsForwardingV2 = "all-domain-apis-forwarding-v2"
)

type (
Expand All @@ -68,11 +92,13 @@ type (
config *Config
domainCache cache.DomainCache
allDomainAPIs bool
selectedAPIs map[string]struct{}
targetCluster string
}
)

// selectedAPIsForwardingRedirectionPolicyAPIAllowlist contains a list of non-worker APIs which can be redirected
// selectedAPIsForwardingRedirectionPolicyAPIAllowlist contains a list of non-worker APIs which can be redirected.
// This is paired with DCRedirectionPolicySelectedAPIsForwarding - keep both lists up to date.
var selectedAPIsForwardingRedirectionPolicyAPIAllowlist = map[string]struct{}{
"StartWorkflowExecution": {},
"SignalWithStartWorkflowExecution": {},
Expand All @@ -83,6 +109,28 @@ var selectedAPIsForwardingRedirectionPolicyAPIAllowlist = map[string]struct{}{
"ResetWorkflowExecution": {},
}

// selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2 contains a list of non-worker APIs which can be redirected.
// This is paired with DCRedirectionPolicySelectedAPIsForwardingV2 - keep both lists up to date.
var selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2 = map[string]struct{}{
// from selectedAPIsForwardingRedirectionPolicyAPIAllowlist
"StartWorkflowExecution": {},
"SignalWithStartWorkflowExecution": {},
"SignalWorkflowExecution": {},
"RequestCancelWorkflowExecution": {},
"TerminateWorkflowExecution": {},
"QueryWorkflowStrongConsistency": {},
"ResetWorkflowExecution": {},
// additional endpoints
"RecordActivityTaskHeartbeat": {},
"RecordActivityTaskHeartbeatByID": {},
"RespondActivityTaskCanceled": {},
"RespondActivityTaskCanceledByID": {},
"RespondActivityTaskCompleted": {},
"RespondActivityTaskCompletedByID": {},
"RespondActivityTaskFailed": {},
"RespondActivityTaskFailedByID": {},
}

// RedirectionPolicyGenerator generate corresponding redirection policy
func RedirectionPolicyGenerator(clusterMetadata cluster.Metadata, config *Config,
domainCache cache.DomainCache, policy config.ClusterRedirectionPolicy) ClusterRedirectionPolicy {
Expand All @@ -94,10 +142,17 @@ func RedirectionPolicyGenerator(clusterMetadata cluster.Metadata, config *Config
return newNoopRedirectionPolicy(clusterMetadata.GetCurrentClusterName())
case DCRedirectionPolicySelectedAPIsForwarding:
currentClusterName := clusterMetadata.GetCurrentClusterName()
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, domainCache, false, "")
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, domainCache, false, selectedAPIsForwardingRedirectionPolicyAPIAllowlist, "")
case DCRedirectionPolicySelectedAPIsForwardingV2:
currentClusterName := clusterMetadata.GetCurrentClusterName()
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, domainCache, false, selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2, "")
case DCRedirectionPolicyAllDomainAPIsForwarding:
currentClusterName := clusterMetadata.GetCurrentClusterName()
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, domainCache, true, policy.AllDomainApisForwardingTargetCluster)
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, domainCache, true, selectedAPIsForwardingRedirectionPolicyAPIAllowlist, policy.AllDomainApisForwardingTargetCluster)
case DCRedirectionPolicyAllDomainAPIsForwardingV2:
currentClusterName := clusterMetadata.GetCurrentClusterName()
return newSelectedOrAllAPIsForwardingPolicy(currentClusterName, config, domainCache, true, selectedAPIsForwardingRedirectionPolicyAPIAllowlistV2, policy.AllDomainApisForwardingTargetCluster)

default:
panic(fmt.Sprintf("Unknown DC redirection policy %v", policy.Policy))
}
Expand All @@ -121,12 +176,13 @@ func (policy *noopRedirectionPolicy) WithDomainNameRedirect(ctx context.Context,
}

// newSelectedOrAllAPIsForwardingPolicy creates a forwarding policy for selected APIs based on domain
func newSelectedOrAllAPIsForwardingPolicy(currentClusterName string, config *Config, domainCache cache.DomainCache, allDoaminAPIs bool, targetCluster string) *selectedOrAllAPIsForwardingRedirectionPolicy {
func newSelectedOrAllAPIsForwardingPolicy(currentClusterName string, config *Config, domainCache cache.DomainCache, allDoaminAPIs bool, selectedAPIs map[string]struct{}, targetCluster string) *selectedOrAllAPIsForwardingRedirectionPolicy {
return &selectedOrAllAPIsForwardingRedirectionPolicy{
currentClusterName: currentClusterName,
config: config,
domainCache: domainCache,
allDomainAPIs: allDoaminAPIs,
selectedAPIs: selectedAPIs,
targetCluster: targetCluster,
}
}
Expand Down Expand Up @@ -190,11 +246,11 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) getTargetClusterAndI
if policy.targetCluster == currentActiveCluster {
return currentActiveCluster, true
}
// fallback to selectedAPIsForwardingRedirectionPolicy if targetCluster is not empty and not the same as currentActiveCluster
// fallback to selected APIs if targetCluster is not empty and not the same as currentActiveCluster
}
}

_, ok := selectedAPIsForwardingRedirectionPolicyAPIAllowlist[apiName]
_, ok := policy.selectedAPIs[apiName]
if !ok {
// do not do dc redirection if API is not whitelisted
return policy.currentClusterName, false
Expand Down
1 change: 1 addition & 0 deletions service/frontend/clusterRedirectionPolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) SetupTest() {
s.mockConfig,
s.mockDomainCache,
false,
selectedAPIsForwardingRedirectionPolicyAPIAllowlist,
"",
)
}
Expand Down

0 comments on commit 20329a2

Please sign in to comment.