Skip to content

Commit

Permalink
Integrate failover into into describe domain response (cadence-workfl…
Browse files Browse the repository at this point in the history
…ow#4440)

* Integrate failover into describe domain response
  • Loading branch information
yux0 authored Sep 14, 2021
1 parent 5ac1940 commit 0c3db56
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 1 deletion.
8 changes: 8 additions & 0 deletions common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,14 @@ func (d *handlerImpl) DescribeDomain(
IsGlobalDomain: resp.IsGlobalDomain,
FailoverVersion: resp.FailoverVersion,
}
if resp.FailoverEndTime != nil {
response.FailoverInfo = &types.FailoverInfo{
FailoverVersion: resp.FailoverVersion,
// This reflects that last domain update time. If there is a domain config update, this won't be accurate.
FailoverStartTimestamp: resp.LastUpdatedTime,
FailoverExpireTimestamp: *resp.FailoverEndTime,
}
}
response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(resp.Info, resp.Config, resp.ReplicationConfig)
return response, nil
}
Expand Down
33 changes: 32 additions & 1 deletion common/types/mapper/proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,9 @@ func FromDescribeDomainResponseDomain(t *types.DescribeDomainResponse) *apiv1.Do
domain.ActiveClusterName = repl.ActiveClusterName
domain.Clusters = FromClusterReplicationConfigurationArray(repl.Clusters)
}
if info := t.GetFailoverInfo(); info != nil {
domain.FailoverInfo = FromFailoverInfo(t.GetFailoverInfo())
}
return &domain
}

Expand Down Expand Up @@ -1119,14 +1122,42 @@ func ToDescribeDomainResponseDomain(t *apiv1.Domain) *types.DescribeDomainRespon
},
FailoverVersion: t.FailoverVersion,
IsGlobalDomain: t.IsGlobalDomain,
FailoverInfo: ToFailoverInfo(t.FailoverInfo),
}
}

func ToDescribeDomainResponse(t *apiv1.DescribeDomainResponse) *types.DescribeDomainResponse {
if t == nil {
return nil
}
return ToDescribeDomainResponseDomain(t.Domain)
response := ToDescribeDomainResponseDomain(t.Domain)
return response
}

func FromFailoverInfo(t *types.FailoverInfo) *apiv1.FailoverInfo {
if t == nil {
return nil
}
return &apiv1.FailoverInfo{
FailoverVersion: t.GetFailoverVersion(),
FailoverStartTimestamp: unixNanoToTime(&t.FailoverStartTimestamp),
FailoverExpireTimestamp: unixNanoToTime(&t.FailoverExpireTimestamp),
CompletedShardCount: t.GetCompletedShardCount(),
PendingShards: t.GetPendingShards(),
}
}

func ToFailoverInfo(t *apiv1.FailoverInfo) *types.FailoverInfo {
if t == nil {
return nil
}
return &types.FailoverInfo{
FailoverVersion: t.GetFailoverVersion(),
FailoverStartTimestamp: *timeToUnixNano(t.GetFailoverStartTimestamp()),
FailoverExpireTimestamp: *timeToUnixNano(t.GetFailoverExpireTimestamp()),
CompletedShardCount: t.GetCompletedShardCount(),
PendingShards: t.GetPendingShards(),
}
}

func FromDescribeTaskListRequest(t *types.DescribeTaskListRequest) *apiv1.DescribeTaskListRequest {
Expand Down
11 changes: 11 additions & 0 deletions common/types/mapper/proto/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ func TestListClosedWorkflowExecutionsRequest(t *testing.T) {
assert.Equal(t, item, ToListClosedWorkflowExecutionsRequest(FromListClosedWorkflowExecutionsRequest(item)))
}
}

func TestListOpenWorkflowExecutionsRequest(t *testing.T) {
for _, item := range []*types.ListOpenWorkflowExecutionsRequest{
nil,
Expand All @@ -1018,3 +1019,13 @@ func TestListOpenWorkflowExecutionsRequest(t *testing.T) {
assert.Equal(t, item, ToListOpenWorkflowExecutionsRequest(FromListOpenWorkflowExecutionsRequest(item)))
}
}

func TestFailoverInfo(t *testing.T) {
for _, item := range []*types.FailoverInfo{
nil,
{},
&testdata.FailoverInfo,
} {
assert.Equal(t, item, ToFailoverInfo(FromFailoverInfo(item)))
}
}
30 changes: 30 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,7 @@ func FromDescribeDomainResponse(t *types.DescribeDomainResponse) *shared.Describ
ReplicationConfiguration: FromDomainReplicationConfiguration(t.ReplicationConfiguration),
FailoverVersion: &t.FailoverVersion,
IsGlobalDomain: &t.IsGlobalDomain,
FailoverInfo: FromFailoverInfo(t.GetFailoverInfo()),
}
}

Expand All @@ -1549,6 +1550,7 @@ func ToDescribeDomainResponse(t *shared.DescribeDomainResponse) *types.DescribeD
ReplicationConfiguration: ToDomainReplicationConfiguration(t.ReplicationConfiguration),
FailoverVersion: t.GetFailoverVersion(),
IsGlobalDomain: t.GetIsGlobalDomain(),
FailoverInfo: ToFailoverInfo(t.FailoverInfo),
}
}

Expand Down Expand Up @@ -1894,6 +1896,34 @@ func ToDomainInfo(t *shared.DomainInfo) *types.DomainInfo {
}
}

// FromFailoverInfo converts internal FailoverInfo type to thrift
func FromFailoverInfo(t *types.FailoverInfo) *shared.FailoverInfo {
if t == nil {
return nil
}
return &shared.FailoverInfo{
FailoverVersion: &t.FailoverVersion,
FailoverStartTimestamp: &t.FailoverStartTimestamp,
FailoverExpireTimestamp: &t.FailoverExpireTimestamp,
CompletedShardCount: &t.CompletedShardCount,
PendingShards: t.GetPendingShards(),
}
}

// ToFailoverInfo converts thrift FailoverInfo type to internal
func ToFailoverInfo(t *shared.FailoverInfo) *types.FailoverInfo {
if t == nil {
return nil
}
return &types.FailoverInfo{
FailoverVersion: t.GetFailoverVersion(),
FailoverStartTimestamp: t.GetFailoverStartTimestamp(),
FailoverExpireTimestamp: t.GetFailoverExpireTimestamp(),
CompletedShardCount: t.GetCompletedShardCount(),
PendingShards: t.GetPendingShards(),
}
}

// FromDomainNotActiveError converts internal DomainNotActiveError type to thrift
func FromDomainNotActiveError(t *types.DomainNotActiveError) *shared.DomainNotActiveError {
if t == nil {
Expand Down
6 changes: 6 additions & 0 deletions common/types/mapper/thrift/thrift-tests/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,9 @@ func TestGetFailoverInfoResponse(t *testing.T) {
assert.Equal(t, item, thrift.ToGetFailoverInfoResponse(thrift.FromGetFailoverInfoResponse(item)))
}
}

func TestFailoverInfo(t *testing.T) {
for _, item := range []*types.FailoverInfo{nil, {}, &testdata.FailoverInfo} {
assert.Equal(t, item, thrift.ToFailoverInfo(thrift.FromFailoverInfo(item)))
}
}
58 changes: 58 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -2314,6 +2314,7 @@ type DescribeDomainResponse struct {
ReplicationConfiguration *DomainReplicationConfiguration `json:"replicationConfiguration,omitempty"`
FailoverVersion int64 `json:"failoverVersion,omitempty"`
IsGlobalDomain bool `json:"isGlobalDomain,omitempty"`
FailoverInfo *FailoverInfo `json:"failoverInfo,omitempty"`
}

// GetDomainInfo is an internal getter (TBD...)
Expand Down Expand Up @@ -2356,6 +2357,14 @@ func (v *DescribeDomainResponse) GetIsGlobalDomain() (o bool) {
return
}

// GetFailoverInfo is an internal getter (TBD...)
func (v *DescribeDomainResponse) GetFailoverInfo() (o *FailoverInfo) {
if v != nil {
return v.FailoverInfo
}
return
}

// DescribeHistoryHostRequest is an internal type (TBD...)
type DescribeHistoryHostRequest struct {
HostAddress *string `json:"hostAddress,omitempty"`
Expand Down Expand Up @@ -3560,6 +3569,55 @@ func (v *GetWorkflowExecutionHistoryResponse) GetArchived() (o bool) {
return
}

// FailoverInfo is an internal type (TBD...)
type FailoverInfo struct {
FailoverVersion int64 `json:"failoverVersion,omitempty"`
FailoverStartTimestamp int64 `json:"failoverStartTimestamp,omitempty"`
FailoverExpireTimestamp int64 `json:"failoverExpireTimestamp,omitempty"`
CompletedShardCount int32 `json:"completedShardCount,omitempty"`
PendingShards []int32 `json:"pendingShards,omitempty"`
}

// GetFailoverVersion is an internal getter (TBD...)
func (v *FailoverInfo) GetFailoverVersion() (o int64) {
if v != nil {
return v.FailoverVersion
}
return
}

// GetFailoverStartTimestamp is an internal getter (TBD...)
func (v *FailoverInfo) GetFailoverStartTimestamp() (o int64) {
if v != nil {
return v.FailoverStartTimestamp
}
return
}

// GetFailoverExpireTimestamp is an internal getter (TBD...)
func (v *FailoverInfo) GetFailoverExpireTimestamp() (o int64) {
if v != nil {
return v.FailoverExpireTimestamp
}
return
}

// GetCompletedShardCount is an internal getter (TBD...)
func (v *FailoverInfo) GetCompletedShardCount() (o int32) {
if v != nil {
return v.CompletedShardCount
}
return
}

// GetPendingShards is an internal getter (TBD...)
func (v *FailoverInfo) GetPendingShards() (o []int32) {
if v != nil {
return v.PendingShards
}
return
}

// Header is an internal type (TBD...)
type Header struct {
Fields map[string][]byte `json:"fields,omitempty"`
Expand Down
7 changes: 7 additions & 0 deletions common/types/testdata/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,11 @@ var (
ClusterReplicationConfigurationArray = []*types.ClusterReplicationConfiguration{
&ClusterReplicationConfiguration,
}
FailoverInfo = types.FailoverInfo{
FailoverVersion: 1,
FailoverStartTimestamp: 1,
FailoverExpireTimestamp: 10,
CompletedShardCount: 10,
PendingShards: []int32{1, 2, 3},
}
)
17 changes: 17 additions & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,23 @@ func (wh *WorkflowHandler) DescribeDomain(
if err != nil {
return resp, wh.error(err, scope)
}

if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
// fetch ongoing failover info from history service
failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
DomainID: resp.GetDomainInfo().UUID,
})
if err != nil {
// despite the error from history, return describe domain response
wh.GetLogger().Error(
fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
tag.Error(err),
)
return resp, nil
}
resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
}
return resp, err
}

Expand Down
17 changes: 17 additions & 0 deletions tools/cli/domainCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,23 @@ func (d *domainCLIImpl) DescribeDomain(c *cli.Context) {
}
table.Render()
}
if resp.GetFailoverInfo() != nil {
info := resp.GetFailoverInfo()
fmt.Println("Graceful failover info:")
table := tablewriter.NewWriter(os.Stdout)
table.SetBorder(true)
table.SetColumnSeparator("|")
header := []string{"Failover Version", "Start Time", "Expire Time", "Completed Shard Count", "Pending Shard"}
table.SetHeader(header)
row := []string{}
row = append(row, fmt.Sprintf("%v", info.GetFailoverVersion()))
row = append(row, time.Unix(0, info.GetFailoverStartTimestamp()).String())
row = append(row, time.Unix(0, info.GetFailoverExpireTimestamp()).String())
row = append(row, fmt.Sprintf("%v", info.GetCompletedShardCount()))
row = append(row, fmt.Sprintf("%v", info.GetPendingShards()))
table.Append(row)
table.Render()
}
}

func (d *domainCLIImpl) ListDomains(c *cli.Context) {
Expand Down

0 comments on commit 0c3db56

Please sign in to comment.