Skip to content

Commit

Permalink
CORDA-3677 FlowExternalOperation serialising reference to FlowLogic (c…
Browse files Browse the repository at this point in the history
…orda#6094)

* Stop capturing 'FlowLogic' references in flowAsyncOperation;

Creating concrete classes removes the implicit reference to FlowLogic (as this) being included in the anonymous object

* Modify test code so that lambdas no longer get implicit references to their enclosing 'FlowLogic'
  • Loading branch information
kyriathar-r3 authored and lankydan committed Mar 26, 2020
1 parent 668748b commit f695296
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,14 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
FlowWithExternalProcess(party) {

@Suspendable
override fun testCode(): Any =
await(ExternalAsyncOperation(serviceHub) { _, _ ->
override fun testCode(): Any {
val e = createException()
return await(ExternalAsyncOperation(serviceHub) { _, _ ->
CompletableFuture<Any>().apply {
completeExceptionally(createException())
completeExceptionally(e)
}
})
}

private fun createException() = when (exceptionType) {
HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
FlowWithExternalProcess(party) {

@Suspendable
override fun testCode(): Any = await(ExternalOperation(serviceHub) { _, _ -> throw createException() })
override fun testCode() {
val e = createException()
await(ExternalOperation(serviceHub) { _, _ -> throw e })
}

private fun createException() = when (exceptionType) {
HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around")
Expand Down
41 changes: 18 additions & 23 deletions core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,7 @@ abstract class FlowLogic<out T> {
@Suspendable
fun <R : Any> await(operation: FlowExternalAsyncOperation<R>): R {
// Wraps the passed in [FlowExternalAsyncOperation] so its [CompletableFuture] can be converted into a [CordaFuture]
val flowAsyncOperation = object : FlowAsyncOperation<R>, WrappedFlowExternalAsyncOperation<R> {
override val operation = operation
override fun execute(deduplicationId: String): CordaFuture<R> {
return this.operation.execute(deduplicationId).asCordaFuture()
}
}
val flowAsyncOperation = WrappedFlowExternalAsyncOperation(operation)
val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation)
return stateMachine.suspend(request, false)
}
Expand All @@ -550,18 +545,7 @@ abstract class FlowLogic<out T> {
*/
@Suspendable
fun <R : Any> await(operation: FlowExternalOperation<R>): R {
val flowAsyncOperation = object : FlowAsyncOperation<R>, WrappedFlowExternalOperation<R> {
override val serviceHub = this@FlowLogic.serviceHub as ServiceHubCoreInternal
override val operation = operation
override fun execute(deduplicationId: String): CordaFuture<R> {
// Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation
// the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run]
return CompletableFuture.supplyAsync(
Supplier { this.operation.execute(deduplicationId) },
serviceHub.externalOperationExecutor
).asCordaFuture()
}
}
val flowAsyncOperation = WrappedFlowExternalOperation(serviceHub as ServiceHubCoreInternal, operation)
val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation)
return stateMachine.suspend(request, false)
}
Expand All @@ -571,21 +555,32 @@ abstract class FlowLogic<out T> {
* [WrappedFlowExternalAsyncOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalAsyncOperation].
*/
private interface WrappedFlowExternalAsyncOperation<R : Any> {
val operation: FlowExternalAsyncOperation<R>
private class WrappedFlowExternalAsyncOperation<R : Any>(val operation: FlowExternalAsyncOperation<R>) : FlowAsyncOperation<R> {
override fun execute(deduplicationId: String): CordaFuture<R> {
return operation.execute(deduplicationId).asCordaFuture()
}
}

/**
* [WrappedFlowExternalOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalOperation].
*
* The reference to [ServiceHub] is is also needed by Kryo to properly keep a reference to [ServiceHub] so that
* The reference to [ServiceHub] is also needed by Kryo to properly keep a reference to [ServiceHub] so that
* [FlowExternalOperation] can be run from the [ServiceHubCoreInternal.externalOperationExecutor] without causing errors when retrying a
* flow. A [NullPointerException] is thrown if [FlowLogic.serviceHub] is accessed from [FlowLogic.await] when retrying a flow.
*/
private interface WrappedFlowExternalOperation<R : Any> {
val serviceHub: ServiceHub
private class WrappedFlowExternalOperation<R : Any>(
val serviceHub: ServiceHubCoreInternal,
val operation: FlowExternalOperation<R>
) : FlowAsyncOperation<R> {
override fun execute(deduplicationId: String): CordaFuture<R> {
// Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation
// the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run]
return CompletableFuture.supplyAsync(
Supplier { this.operation.execute(deduplicationId) },
serviceHub.externalOperationExecutor
).asCordaFuture()
}
}

/**
Expand Down

0 comments on commit f695296

Please sign in to comment.