Skip to content

Commit

Permalink
Add WorkflowInfo#getHistoryLength (temporalio#1498)
Browse files Browse the repository at this point in the history
  • Loading branch information
Spikhalskiy authored Oct 27, 2022
1 parent af7c2ca commit e023e04
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,9 @@ void getVersion(
* @return workflow header
*/
Map<String, Payload> getHeader();

/**
* @return eventId of the last / currently active workflow task of this workflow
*/
long getCurrentWorkflowTaskStartedEventId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,9 @@ public String getFullReplayDirectQueryName() {
public Map<String, Payload> getHeader() {
return basicWorkflowContext.getHeader();
}

@Override
public long getCurrentWorkflowTaskStartedEventId() {
return workflowStateMachines.getCurrentStartedEventId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public String getCronSchedule() {
return context.getCronSchedule();
}

@Override
public long getHistoryLength() {
return context.getCurrentWorkflowTaskStartedEventId();
}

@Override
public String toString() {
return "WorkflowInfo{"
Expand Down Expand Up @@ -145,6 +150,8 @@ public String toString() {
+ getAttempt()
+ ", cronSchedule="
+ getCronSchedule()
+ ", historyLength="
+ getHistoryLength()
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,11 @@ public interface WorkflowInfo {
int getAttempt();

String getCronSchedule();

/**
* @return length of Workflow history up until the current moment of execution. This value changes
* during the lifetime of a Workflow Execution. You may use this information to decide when to
* call {@link Workflow#continueAsNew(Object...)}.
*/
long getHistoryLength();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeFalse;

import io.temporal.activity.LocalActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.WorkflowReplayer;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;

public class GetHistoryLengthTest {
private static final TestActivities.VariousTestActivities activitiesImpl =
new TestActivities.TestActivitiesImpl();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflowImpl.class)
.setActivityImplementations(activitiesImpl)
.build();

@Test(timeout = 20000)
public void getHistoryLength() {
TestWorkflows.TestWorkflowReturnString workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(
TestWorkflows.TestWorkflowReturnString.class);
assertEquals("done", workflowStub.execute());

WorkflowExecution execution = WorkflowStub.fromTyped(workflowStub).getExecution();
testWorkflowRule.regenerateHistoryForReplay(execution, "testGetHistoryLength");
}

@Test
public void replay() throws Exception {
// Avoid executing 4 times
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);
WorkflowReplayer.replayWorkflowExecutionFromResource(
"testGetHistoryLength.json", TestWorkflowImpl.class);
}

public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {

@Override
public String execute() {
LocalActivityOptions options =
LocalActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(30))
.build();

TestActivities.VariousTestActivities activities =
Workflow.newLocalActivityStub(TestActivities.VariousTestActivities.class, options);

assertEquals(3, Workflow.getInfo().getHistoryLength());

// Force WFT heartbeat
activities.sleepActivity(TimeUnit.SECONDS.toMillis(10), 1);

assertEquals(9, Workflow.getInfo().getHistoryLength());

return "done";
}
}
}
201 changes: 201 additions & 0 deletions temporal-sdk/src/test/resources/testGetHistoryLength.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2022-10-27T16:14:17.231Z",
"eventType": "WorkflowExecutionStarted",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "TestWorkflowReturnString"
},
"taskQueue": {
"name": "WorkflowTest-getHistoryLength-c032d750-51b9-4b9a-8a24-bb9883d9bb85"
},
"input": {},
"workflowExecutionTimeout": "315360000s",
"workflowRunTimeout": "200s",
"workflowTaskTimeout": "5s",
"originalExecutionRunId": "1e7398be-bd30-4e26-8c6f-c3dbeed77856",
"identity": "[email protected]",
"firstExecutionRunId": "1e7398be-bd30-4e26-8c6f-c3dbeed77856",
"attempt": 1,
"header": {}
}
},
{
"eventId": "2",
"eventTime": "2022-10-27T16:14:17.231Z",
"eventType": "WorkflowTaskScheduled",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "WorkflowTest-getHistoryLength-c032d750-51b9-4b9a-8a24-bb9883d9bb85"
},
"startToCloseTimeout": "5s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2022-10-27T16:14:17.252Z",
"eventType": "WorkflowTaskStarted",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "[email protected]"
}
},
{
"eventId": "4",
"eventTime": "2022-10-27T16:14:21.361Z",
"eventType": "WorkflowTaskCompleted",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"identity": "[email protected]"
}
},
{
"eventId": "5",
"eventTime": "2022-10-27T16:14:21.361Z",
"eventType": "WorkflowTaskScheduled",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "WorkflowTest-getHistoryLength-c032d750-51b9-4b9a-8a24-bb9883d9bb85"
},
"startToCloseTimeout": "5s",
"attempt": 2
}
},
{
"eventId": "6",
"eventTime": "2022-10-27T16:14:21.361Z",
"eventType": "WorkflowTaskStarted",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "5",
"identity": "[email protected]"
}
},
{
"eventId": "7",
"eventTime": "2022-10-27T16:14:25.368Z",
"eventType": "WorkflowTaskCompleted",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "5",
"identity": "[email protected]"
}
},
{
"eventId": "8",
"eventTime": "2022-10-27T16:14:25.368Z",
"eventType": "WorkflowTaskScheduled",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "WorkflowTest-getHistoryLength-c032d750-51b9-4b9a-8a24-bb9883d9bb85"
},
"startToCloseTimeout": "5s",
"attempt": 1
}
},
{
"eventId": "9",
"eventTime": "2022-10-27T16:14:25.368Z",
"eventType": "WorkflowTaskStarted",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "8",
"identity": "[email protected]"
}
},
{
"eventId": "10",
"eventTime": "2022-10-27T16:14:27.507Z",
"eventType": "WorkflowTaskCompleted",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "8",
"identity": "[email protected]"
}
},
{
"eventId": "11",
"eventTime": "2022-10-27T16:14:27.507Z",
"eventType": "MarkerRecorded",
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": {
"result": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "InNsZWVwQWN0aXZpdHkxIg\u003d\u003d"
}
]
},
"activityId": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "ImEzYmE4OTY0LWIyMWMtMzUzZi04ZmQ2LTQ5Njk1M2RiYjBmNSI\u003d"
}
]
},
"input": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "MTAwMDA\u003d"
},
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "MQ\u003d\u003d"
}
]
},
"time": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "MTY2Njg4NzI2NzI5MA\u003d\u003d"
}
]
},
"type": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "IlNsZWVwQWN0aXZpdHki"
}
]
}
},
"workflowTaskCompletedEventId": "9"
}
},
{
"eventId": "12",
"eventTime": "2022-10-27T16:14:27.507Z",
"eventType": "WorkflowExecutionCompleted",
"workflowExecutionCompletedEventAttributes": {
"result": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "ImRvbmUi"
}
]
},
"workflowTaskCompletedEventId": "9"
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,5 +289,10 @@ public String getFullReplayDirectQueryName() {
public Map<String, Payload> getHeader() {
return null;
}

@Override
public long getCurrentWorkflowTaskStartedEventId() {
return 0;
}
}
}

0 comments on commit e023e04

Please sign in to comment.