Skip to content

Commit

Permalink
Implement Admin Get Raw History API For NDC (cadence-workflow#2625)
Browse files Browse the repository at this point in the history
* Implement re-replication API v2. This API is going to use in re-replication case. It takes in start event id, start event version and end event id, end event version exclusive-exclusive.
  • Loading branch information
yux0 authored Oct 10, 2019
1 parent 0211497 commit 189b1e1
Show file tree
Hide file tree
Showing 17 changed files with 3,222 additions and 597 deletions.
2,463 changes: 1,888 additions & 575 deletions .gen/go/admin/admin.go

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions .gen/go/admin/adminserviceclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion .gen/go/admin/adminserviceserver/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions .gen/go/admin/adminservicetest/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions client/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,22 @@ func (c *clientImpl) GetWorkflowExecutionRawHistory(
return client.GetWorkflowExecutionRawHistory(ctx, request, opts...)
}

func (c *clientImpl) GetWorkflowExecutionRawHistoryV2(
ctx context.Context,
request *admin.GetWorkflowExecutionRawHistoryV2Request,
opts ...yarpc.CallOption,
) (*admin.GetWorkflowExecutionRawHistoryV2Response, error) {

opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.GetWorkflowExecutionRawHistoryV2(ctx, request, opts...)
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
if parent == nil {
return context.WithTimeout(context.Background(), c.timeout)
Expand Down
18 changes: 18 additions & 0 deletions client/admin/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,21 @@ func (c *metricClient) GetWorkflowExecutionRawHistory(
}
return resp, err
}

func (c *metricClient) GetWorkflowExecutionRawHistoryV2(
ctx context.Context,
request *admin.GetWorkflowExecutionRawHistoryV2Request,
opts ...yarpc.CallOption,
) (*admin.GetWorkflowExecutionRawHistoryV2Response, error) {

c.metricsClient.IncCounter(metrics.AdminClientGetWorkflowExecutionRawHistoryV2Scope, metrics.CadenceClientRequests)

sw := c.metricsClient.StartTimer(metrics.AdminClientGetWorkflowExecutionRawHistoryV2Scope, metrics.CadenceClientLatency)
resp, err := c.client.GetWorkflowExecutionRawHistoryV2(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.AdminClientGetWorkflowExecutionRawHistoryV2Scope, metrics.CadenceClientFailures)
}
return resp, err
}
16 changes: 16 additions & 0 deletions client/admin/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,19 @@ func (c *retryableClient) GetWorkflowExecutionRawHistory(
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) GetWorkflowExecutionRawHistoryV2(
ctx context.Context,
request *admin.GetWorkflowExecutionRawHistoryV2Request,
opts ...yarpc.CallOption,
) (*admin.GetWorkflowExecutionRawHistoryV2Response, error) {

var resp *admin.GetWorkflowExecutionRawHistoryV2Response
op := func() error {
var err error
resp, err = c.client.GetWorkflowExecutionRawHistoryV2(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}
16 changes: 11 additions & 5 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ const (
AdminClientDescribeWorkflowExecutionScope
// AdminClientGetWorkflowExecutionRawHistoryScope tracks RPC calls to admin service
AdminClientGetWorkflowExecutionRawHistoryScope
// AdminClientGetWorkflowExecutionRawHistoryV2Scope tracks RPC calls to admin service
AdminClientGetWorkflowExecutionRawHistoryV2Scope
// DCRedirectionDeprecateDomainScope tracks RPC calls for dc redirection
DCRedirectionDeprecateDomainScope
// DCRedirectionDescribeDomainScope tracks RPC calls for dc redirection
Expand Down Expand Up @@ -580,6 +582,8 @@ const (
AdminDescribeWorkflowExecutionScope
// AdminGetWorkflowExecutionRawHistoryScope is the metric scope for admin.GetWorkflowExecutionRawHistoryScope
AdminGetWorkflowExecutionRawHistoryScope
// AdminGetWorkflowExecutionRawHistoryV2Scope is the metric scope for admin.GetWorkflowExecutionRawHistoryScope
AdminGetWorkflowExecutionRawHistoryV2Scope
// AdminRemoveTaskScope is the metric scope for admin.AdminRemoveTaskScope
AdminRemoveTaskScope
//AdminCloseShardTaskScope is the metric scope for admin.AdminRemoveTaskScope
Expand Down Expand Up @@ -1080,6 +1084,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
AdminClientDescribeHistoryHostScope: {operation: "AdminClientDescribeHistoryHost", tags: map[string]string{CadenceRoleTagName: AdminRoleTagValue}},
AdminClientDescribeWorkflowExecutionScope: {operation: "AdminClientDescribeWorkflowExecution", tags: map[string]string{CadenceRoleTagName: AdminRoleTagValue}},
AdminClientGetWorkflowExecutionRawHistoryScope: {operation: "AdminClientGetWorkflowExecutionRawHistory", tags: map[string]string{CadenceRoleTagName: AdminRoleTagValue}},
AdminClientGetWorkflowExecutionRawHistoryV2Scope: {operation: "AdminClientGetWorkflowExecutionRawHistoryV2", tags: map[string]string{CadenceRoleTagName: AdminRoleTagValue}},
AdminClientCloseShardScope: {operation: "AdminClientCloseShard", tags: map[string]string{CadenceRoleTagName: AdminRoleTagValue}},
DCRedirectionDeprecateDomainScope: {operation: "DCRedirectionDeprecateDomain", tags: map[string]string{CadenceRoleTagName: DCRedirectionRoleTagValue}},
DCRedirectionDescribeDomainScope: {operation: "DCRedirectionDescribeDomain", tags: map[string]string{CadenceRoleTagName: DCRedirectionRoleTagValue}},
Expand Down Expand Up @@ -1158,11 +1163,12 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
// Frontend Scope Names
Frontend: {
// Admin API scope co-locates with with frontend
AdminRemoveTaskScope: {operation: "AdminRemoveTask"},
AdminCloseShardTaskScope: {operation: "AdminCloseShardTask"},
AdminDescribeHistoryHostScope: {operation: "DescribeHistoryHost"},
AdminDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"},
AdminGetWorkflowExecutionRawHistoryScope: {operation: "GetWorkflowExecutionRawHistory"},
AdminRemoveTaskScope: {operation: "AdminRemoveTask"},
AdminCloseShardTaskScope: {operation: "AdminCloseShardTask"},
AdminDescribeHistoryHostScope: {operation: "DescribeHistoryHost"},
AdminDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"},
AdminGetWorkflowExecutionRawHistoryScope: {operation: "GetWorkflowExecutionRawHistory"},
AdminGetWorkflowExecutionRawHistoryV2Scope: {operation: "GetWorkflowExecutionRawHistoryV2"},

FrontendStartWorkflowExecutionScope: {operation: "StartWorkflowExecution"},
FrontendPollForDecisionTaskScope: {operation: "PollForDecisionTask"},
Expand Down
8 changes: 6 additions & 2 deletions common/persistence/versionHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (item *VersionHistoryItem) Equals(input *VersionHistoryItem) bool {
// NewVersionHistory create a new version history
func NewVersionHistory(
inputToken []byte,
inputitems []*VersionHistoryItem,
inputItems []*VersionHistoryItem,
) *VersionHistory {

token := make([]byte, len(inputToken))
Expand All @@ -100,7 +100,7 @@ func NewVersionHistory(
items: nil,
}

for _, item := range inputitems {
for _, item := range inputItems {
if err := versionHistory.AddOrUpdateItem(item.Duplicate()); err != nil {
panic(fmt.Sprintf("unable to initialize version history: %v", err))
}
Expand Down Expand Up @@ -243,6 +243,10 @@ func (v *VersionHistory) ContainsItem(
prevEventID := common.FirstEventID - 1
for _, currentItem := range v.items {
if item.GetVersion() == currentItem.GetVersion() {
// this is a special handling for event id = 0
if (item.GetEventID() == common.FirstEventID-1) && item.GetEventID() <= currentItem.GetEventID() {
return true
}
if prevEventID < item.GetEventID() && item.GetEventID() <= currentItem.GetEventID() {
return true
}
Expand Down
11 changes: 11 additions & 0 deletions common/persistence/versionHistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ func (s *versionHistoriesSuite) TestContainsItem_False() {
s.False(history.ContainsItem(NewVersionHistoryItem(6, 5)))
}

func (s *versionHistoriesSuite) TestContainsItem_True_WhenFirstEventIDIsZero() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
{eventID: 3, version: 0},
{eventID: 6, version: 4},
}
history := NewVersionHistory(branchToken, items)

s.True(history.ContainsItem(NewVersionHistoryItem(common.FirstEventID-1, 0)))
}

func (s *versionHistorySuite) TestIsLCAAppendable_True() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
Expand Down
11 changes: 9 additions & 2 deletions common/xdc/historyRereplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,15 @@ func (c *historyRereplicationContext) handleEmptyHistory(domainID string, workfl
return err
}

func (c *historyRereplicationContext) getHistory(domainID string, workflowID string, runID string,
firstEventID int64, nextEventID int64, token []byte, pageSize int32) (*admin.GetWorkflowExecutionRawHistoryResponse, error) {
func (c *historyRereplicationContext) getHistory(
domainID string,
workflowID string,
runID string,
firstEventID int64,
nextEventID int64,
token []byte,
pageSize int32,
) (*admin.GetWorkflowExecutionRawHistoryResponse, error) {

logger := c.logger.WithTags(tag.WorkflowRunID(runID), tag.WorkflowFirstEventID(firstEventID), tag.WorkflowNextEventID(nextEventID))

Expand Down
Loading

0 comments on commit 189b1e1

Please sign in to comment.