Skip to content

Commit

Permalink
Merge pull request corda#6578 from corda/WillV/ENT-5395-Pause-and-Res…
Browse files Browse the repository at this point in the history
…ume-Flows

ENT-5395 Pause and Resume Flows
  • Loading branch information
rick-r3 authored Aug 6, 2020
2 parents a73dad0 + 7acc510 commit f280ec9
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ interface CheckpointStorage {
fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>?,
serializedCheckpointState: SerializedBytes<CheckpointState>)

/**
* Update an existing checkpoints status ([Checkpoint.status]).
*/
fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus)

/**
* Update an existing checkpoints compatibility flag ([Checkpoint.compatible]).
*/
fun updateCompatible(runId: StateMachineRunId, compatible: Boolean)

/**
* Update all persisted checkpoints with status [Checkpoint.FlowStatus.RUNNABLE] or [Checkpoint.FlowStatus.HOSPITALIZED],
* changing the status to [Checkpoint.FlowStatus.PAUSED].
Expand Down Expand Up @@ -65,6 +75,4 @@ interface CheckpointStorage {
* This method does not fetch [Checkpoint.Serialized.serializedFlowState] to save memory.
*/
fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>

fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus)
}
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ class DBCheckpointStorage(
currentDBSession().createNativeQuery(update).executeUpdate()
}

override fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) {
val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set compatible = $compatible where flow_id = '${runId.uuid}'"
currentDBSession().createNativeQuery(update).executeUpdate()
}

private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint): DBFlowMetadata {
val context = checkpoint.checkpointState.invocationContext
val flowInfo = checkpoint.checkpointState.subFlowStack.first()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ sealed class Action {
*/
data class PersistCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint, val isCheckpointUpdate: Boolean) : Action()

/**
* Update only the [status] of the checkpoint with [id].
*/
data class UpdateFlowStatus(val id: StateMachineRunId, val status: Checkpoint.FlowStatus): Action()

/**
* Remove the checkpoint corresponding to [id].
*/
Expand Down Expand Up @@ -106,6 +111,11 @@ sealed class Action {
val lastState: StateMachineState
) : Action()

/**
* Move the flow corresponding to [flowId] to paused.
*/
data class MoveFlowToPaused(val currentState: StateMachineState) : Action()

/**
* Schedule [event] to self.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ internal class ActionExecutorImpl(
is Action.RetryFlowFromSafePoint -> executeRetryFlowFromSafePoint(action)
is Action.ScheduleFlowTimeout -> scheduleFlowTimeout(action)
is Action.CancelFlowTimeout -> cancelFlowTimeout(action)
is Action.MoveFlowToPaused -> executeMoveFlowToPaused(action)
is Action.UpdateFlowStatus -> executeUpdateFlowStatus(action)
}
}
private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) {
Expand Down Expand Up @@ -99,6 +101,11 @@ internal class ActionExecutorImpl(
}
}

@Suspendable
private fun executeUpdateFlowStatus(action: Action.UpdateFlowStatus) {
checkpointStorage.updateStatus(action.id, action.status)
}

@Suspendable
private fun executePersistDeduplicationIds(action: Action.PersistDeduplicationFacts) {
for (handle in action.deduplicationHandlers) {
Expand Down Expand Up @@ -191,6 +198,11 @@ internal class ActionExecutorImpl(
stateMachineManager.removeFlow(action.flowId, action.removalReason, action.lastState)
}

@Suspendable
private fun executeMoveFlowToPaused(action: Action.MoveFlowToPaused) {
stateMachineManager.moveFlowToPaused(action.currentState)
}

@Suspendable
@Throws(SQLException::class)
private fun executeCreateTransaction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ sealed class Event {
override fun toString() = "WakeUpSleepyFlow"
}

/**
* Pause the flow.
*/
object Pause: Event() {
override fun toString() = "Pause"
}

/**
* Terminate the specified [sessions], removing them from in-memory datastructures.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.currentStateMachine
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.isEnabledTimedFlow
import net.corda.nodeapi.internal.persistence.CordaPersistence
Expand All @@ -29,11 +28,16 @@ import java.util.concurrent.Semaphore

class Flow<A>(val fiber: FlowStateMachineImpl<A>, val resultFuture: OpenFuture<Any?>)

class NonResidentFlow(val runId: StateMachineRunId, val checkpoint: Checkpoint) {
val externalEvents = mutableListOf<Event.DeliverSessionMessage>()
data class NonResidentFlow(
val runId: StateMachineRunId,
var checkpoint: Checkpoint,
val resultFuture: OpenFuture<Any?> = openFuture(),
val resumable: Boolean = true
) {
val events = mutableListOf<ExternalEvent>()

fun addExternalEvent(message: Event.DeliverSessionMessage) {
externalEvents.add(message)
fun addExternalEvent(message: ExternalEvent) {
events.add(message)
}
}

Expand Down Expand Up @@ -66,18 +70,29 @@ class FlowCreator(
}
else -> nonResidentFlow.checkpoint
}
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint)
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, resultFuture = nonResidentFlow.resultFuture)
}

@Suppress("LongParameterList")
fun createFlowFromCheckpoint(
runId: StateMachineRunId,
oldCheckpoint: Checkpoint,
reloadCheckpointAfterSuspendCount: Int? = null,
lock: Semaphore = Semaphore(1)
lock: Semaphore = Semaphore(1),
resultFuture: OpenFuture<Any?> = openFuture(),
firstRestore: Boolean = true
): Flow<*>? {
val checkpoint = oldCheckpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
val fiber = checkpoint.getFiberFromCheckpoint(runId) ?: return null
val resultFuture = openFuture<Any?>()
val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore)
var checkpoint = oldCheckpoint
if (fiber == null) {
updateCompatibleInDb(runId, false)
return null
} else if (!oldCheckpoint.compatible) {
updateCompatibleInDb(runId, true)
checkpoint = checkpoint.copy(compatible = true)
}
checkpoint = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)

fiber.logic.stateMachine = fiber
verifyFlowLogicIsSuspendable(fiber.logic)
fiber.transientValues = createTransientValues(runId, resultFuture)
Expand All @@ -92,6 +107,12 @@ class FlowCreator(
return Flow(fiber, resultFuture)
}

private fun updateCompatibleInDb(runId: StateMachineRunId, compatible: Boolean) {
database.transaction {
checkpointStorage.updateCompatible(runId, compatible)
}
}

@Suppress("LongParameterList")
fun <A> createFlowFromLogic(
flowId: StateMachineRunId,
Expand Down Expand Up @@ -135,36 +156,45 @@ class FlowCreator(
return Flow(flowStateMachineImpl, resultFuture)
}

private fun Checkpoint.getFiberFromCheckpoint(runId: StateMachineRunId): FlowStateMachineImpl<*>? {
return when (this.flowState) {
is FlowState.Unstarted -> {
val logic = tryCheckpointDeserialize(this.flowState.frozenFlowLogic, runId) ?: return null
FlowStateMachineImpl(runId, logic, scheduler)
}
is FlowState.Started -> tryCheckpointDeserialize(this.flowState.frozenFiber, runId) ?: return null
// Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint.
else -> null
}
}

@Suppress("TooGenericExceptionCaught")
private inline fun <reified T : Any> tryCheckpointDeserialize(bytes: SerializedBytes<T>, flowId: StateMachineRunId): T? {
return try {
bytes.checkpointDeserialize(context = checkpointSerializationContext)
private fun Checkpoint.getFiberFromCheckpoint(runId: StateMachineRunId, firstRestore: Boolean): FlowStateMachineImpl<*>? {
try {
return when(flowState) {
is FlowState.Unstarted -> {
val logic = deserializeFlowState(flowState.frozenFlowLogic)
FlowStateMachineImpl(runId, logic, scheduler)
}
is FlowState.Started -> deserializeFlowState(flowState.frozenFiber)
// Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint.
else -> return null
}
} catch (e: Exception) {
if (reloadCheckpointAfterSuspend && currentStateMachine() != null) {
if (reloadCheckpointAfterSuspend && FlowStateMachineImpl.currentStateMachine() != null) {
logger.error(
"Unable to deserialize checkpoint for flow $flowId. [reloadCheckpointAfterSuspend] is turned on, throwing exception",
e
"Unable to deserialize checkpoint for flow $runId. [reloadCheckpointAfterSuspend] is turned on, throwing exception",
e
)
throw ReloadFlowFromCheckpointException(e)
} else {
logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e)
null
logSerializationError(firstRestore, runId, e)
return null
}
}
}

private inline fun <reified T : Any> deserializeFlowState(bytes: SerializedBytes<T>): T {
return bytes.checkpointDeserialize(context = checkpointSerializationContext)
}

private fun logSerializationError(firstRestore: Boolean, flowId: StateMachineRunId, exception: Exception) {
if (firstRestore) {
logger.warn("Flow with id $flowId could not be restored from its checkpoint. Normally this means that a CorDapp has been" +
" upgraded without draining the node. To run this flow restart the node after downgrading the CorDapp.", exception)
} else {
logger.error("Unable to deserialize fiber for flow $flowId. Something is very wrong and this flow will be ignored.", exception)
}
}

private fun verifyFlowLogicIsSuspendable(logic: FlowLogic<Any?>) {
// Quasar requires (in Java 8) that at least the call method be annotated suspendable. Unfortunately, it's
// easy to forget to add this when creating a new flow, so we check here to give the user a better error.
Expand Down Expand Up @@ -219,4 +249,4 @@ class FlowCreator(
lock = lock
)
}
}
}
Loading

0 comments on commit f280ec9

Please sign in to comment.