Skip to content

Commit

Permalink
Made FlowCheckpointVersionNodeStartupCheckTest leaner (corda#4640)
Browse files Browse the repository at this point in the history
* Removed `restart node successfully with suspended flow` as it duplicates `TraderDemoTest#Test restart node during flow works properly`
* Removed the need for a notary
  • Loading branch information
shamsasari authored Jan 28, 2019
1 parent 49e23bc commit e20278f
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 106 deletions.
22 changes: 22 additions & 0 deletions .idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/src/main/kotlin/net/corda/core/internal/PathUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ fun Path.copyTo(target: Path, vararg options: CopyOption): Path = Files.copy(thi
/** @see Files.move */
fun Path.moveTo(target: Path, vararg options: CopyOption): Path = Files.move(this, target, *options)

/** @see Files.move */
fun Path.renameTo(fileName: String, vararg options: CopyOption): Path = moveTo(parent / fileName, *options)

/** See overload of [Files.copy] which takes in an [InputStream]. */
fun Path.copyTo(out: OutputStream): Long = Files.copy(this, out)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,133 +1,87 @@
package net.corda.node.flows

import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.CheckpointIncompatibleException
import net.corda.node.internal.NodeStartup
import net.corda.testMessage.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.internal.*
import net.test.cordapp.v1.Record
import net.test.cordapp.v1.SendMessageFlow
import org.assertj.core.api.Assertions.assertThat
import net.corda.testing.node.internal.CustomCordapp
import net.corda.testing.node.internal.ListenProcessDeathException
import net.corda.testing.node.internal.assertCheckpoints
import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Test
import java.nio.file.Path
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull

// TraderDemoTest already has a test which checks the node can resume a flow from a checkpoint
class FlowCheckpointVersionNodeStartupCheckTest {
companion object {
val message = Message("Hello world!")
val defaultCordapp = cordappWithPackages(
MessageState::class.packageName, SendMessageFlow::class.packageName
)
}

@Test
fun `restart node successfully with suspended flow`() {
return driver(parametersForRestartingNodes()) {
createSuspendedFlowInBob(setOf(defaultCordapp))
// Bob will resume the flow
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
startNode(providedName = BOB_NAME).getOrThrow()
val page = alice.rpc.vaultTrack(MessageState::class.java)
val result = if (page.snapshot.states.isNotEmpty()) {
page.snapshot.states.first()
} else {
val r = page.updates.timeout(30, TimeUnit.SECONDS).take(1).toBlocking().single()
if (r.consumed.isNotEmpty()) r.consumed.first() else r.produced.first()
}
assertNotNull(result)
assertEquals(message, result.state.data.message)
}
val defaultCordapp = enclosedCordapp()
}

@Test
fun `restart node with incompatible version of suspended flow due to different jar name`() {
driver(parametersForRestartingNodes()) {
val uniqueName = "different-jar-name-test-${UUID.randomUUID()}"
val cordapp = defaultCordapp.copy(name = uniqueName)

val bobBaseDir = createSuspendedFlowInBob(setOf(cordapp))

val cordappsDir = bobBaseDir / "cordapps"
val cordappJar = cordappsDir.list().single { it.toString().endsWith(".jar") }
// Make sure we're dealing with right jar
assertThat(cordappJar.fileName.toString()).contains(uniqueName)
// Rename the jar file.
cordappJar.moveTo(cordappsDir / "renamed-${cordappJar.fileName}")
val defaultCordappJar = createSuspendedFlowInBob()
defaultCordappJar.renameTo("renamed-${defaultCordappJar.fileName}")

assertBobFailsToStartWithLogMessage(
emptyList(),
CheckpointIncompatibleException.FlowNotInstalledException(SendMessageFlow::class.java).message
CheckpointIncompatibleException.FlowNotInstalledException(ReceiverFlow::class.java).message
)
}
}

@Test
fun `restart node with incompatible version of suspended flow due to different jar hash`() {
driver(parametersForRestartingNodes()) {
val uniqueWorkflowJarName = "different-jar-hash-test-${UUID.randomUUID()}"
val uniqueContractJarName = "contract-$uniqueWorkflowJarName"
val defaultWorkflowJar = cordappWithPackages(SendMessageFlow::class.packageName)
val defaultContractJar = cordappWithPackages(MessageState::class.packageName)
val contractJar = defaultContractJar.copy(name = uniqueContractJarName)
val workflowJar = defaultWorkflowJar.copy(name = uniqueWorkflowJarName)

val bobBaseDir = createSuspendedFlowInBob(setOf(workflowJar, contractJar))

val cordappsDir = bobBaseDir / "cordapps"
val cordappJar = cordappsDir.list().single {
! it.toString().contains(uniqueContractJarName) && it.toString().endsWith(".jar")
}
// Make sure we're dealing with right jar
assertThat(cordappJar.fileName.toString()).contains(uniqueWorkflowJarName)
val defaultCordappJar = createSuspendedFlowInBob()

// The name is part of the MANIFEST so changing it is sufficient to change the jar hash
val modifiedCordapp = workflowJar.copy(name = "${workflowJar.name}-modified")
val modifiedCordapp = defaultCordapp.copy(name = "${defaultCordapp.name}-modified")
val modifiedCordappJar = CustomCordapp.getJarFile(modifiedCordapp)
modifiedCordappJar.moveTo(cordappJar, REPLACE_EXISTING)
modifiedCordappJar.moveTo(defaultCordappJar, REPLACE_EXISTING)

assertBobFailsToStartWithLogMessage(
emptyList(),
// The part of the log message generated by CheckpointIncompatibleException.FlowVersionIncompatibleException
"that is incompatible with the current installed version of"
)
}
}

private fun DriverDSL.createSuspendedFlowInBob(cordapps: Set<TestCordapp>): Path {
val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
.map { startNode(NodeParameters(providedName = it, additionalCordapps = cordapps)) }
.transpose()
.getOrThrow()
alice.stop()
val flowTracker = bob.rpc.startTrackedFlow(::SendMessageFlow, message, defaultNotaryIdentity, alice.nodeInfo.singleIdentity()).progress
// Wait until Bob progresses as far as possible because Alice node is offline
flowTracker.takeFirst { it == SendMessageFlow.Companion.FINALISING_TRANSACTION.label }.toBlocking().single()
private fun DriverDSL.createSuspendedFlowInBob(): Path {
val (alice, bob) = listOf(
startNode(providedName = ALICE_NAME),
startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp)))
).map { it.getOrThrow() }

alice.stop() // Stop Alice so that Bob never receives the message

bob.rpc.startFlow(::ReceiverFlow, alice.nodeInfo.singleIdentity())
// Wait until Bob's flow has started
bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first()
bob.stop()
return bob.baseDirectory
assertCheckpoints(BOB_NAME, 1)

return (bob.baseDirectory / "cordapps").list().single { it.toString().endsWith(".jar") }
}

private fun DriverDSL.assertBobFailsToStartWithLogMessage(cordapps: Collection<TestCordapp>, logMessage: String) {
private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) {
assertFailsWith(ListenProcessDeathException::class) {
startNode(NodeParameters(
providedName = BOB_NAME,
customOverrides = mapOf("devMode" to false),
additionalCordapps = cordapps
customOverrides = mapOf("devMode" to false)
)).getOrThrow()
}

Expand All @@ -141,7 +95,21 @@ class FlowCheckpointVersionNodeStartupCheckTest {
return DriverParameters(
startNodesInProcess = false, // Start nodes in separate processes to ensure CordappLoader is not shared between restarts
inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
cordappsForAllNodes = emptyList()
cordappsForAllNodes = emptyList(),
notarySpecs = emptyList()
)
}

@InitiatingFlow
@StartableByRPC
class ReceiverFlow(private val otherParty: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String = initiateFlow(otherParty).receive<String>().unwrap { it }
}

@InitiatedBy(ReceiverFlow::class)
class SenderFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() = otherSide.send("Hello!")
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
package net.corda.traderdemo

import net.corda.client.rpc.CordaRPCClient
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.div
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.finance.DOLLARS
import net.corda.finance.USD
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.services.Permissions.Companion.all
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.testing.core.*
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.InProcess
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.core.BOC_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.*
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.assertCheckpoints
import net.corda.testing.node.internal.poll
import net.corda.traderdemo.flow.CommercialPaperIssueFlow
import net.corda.traderdemo.flow.SellerFlow
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.sql.DriverManager
import java.util.concurrent.Executors

class TraderDemoTest {
Expand Down Expand Up @@ -85,27 +91,20 @@ class TraderDemoTest {
inMemoryDB = false,
cordappsForAllNodes = FINANCE_CORDAPPS + TestCordapp.findCordapp("net.corda.traderdemo")
)) {
val demoUser = User("demo", "demo", setOf(startFlow<SellerFlow>(), all()))
val bankUser = User("user1", "test", permissions = setOf(all()))
val (nodeA, nodeB, bankNode) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),
startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser))
).map { (it.getOrThrow() as OutOfProcess) }

val nodeBRpc = CordaRPCClient(nodeB.rpcAddress).start(demoUser.username, demoUser.password).proxy
val nodeARpc = CordaRPCClient(nodeA.rpcAddress).start(demoUser.username, demoUser.password).proxy
val nodeBankRpc = let {
val client = CordaRPCClient(bankNode.rpcAddress)
client.start(bankUser.username, bankUser.password).proxy
}

TraderDemoClientApi(nodeBankRpc).runIssuer(amount = 100.DOLLARS, buyerName = nodeA.nodeInfo.singleIdentity().name, sellerName = nodeB.nodeInfo.singleIdentity().name)
val stxFuture = nodeBRpc.startFlow(::SellerFlow, nodeA.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue
nodeARpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts
nodeA.stop()
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to nodeA.p2pAddress.toString()))
stxFuture.getOrThrow()
val (buyer, seller, bank) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME),
startNode(providedName = DUMMY_BANK_B_NAME),
startNode(providedName = BOC_NAME)
).map { it.getOrThrow() }
TraderDemoClientApi(bank.rpc).runIssuer(amount = 100.DOLLARS, buyerName = DUMMY_BANK_A_NAME, sellerName = DUMMY_BANK_B_NAME)
val saleFuture = seller.rpc.startFlow(::SellerFlow, buyer.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue
buyer.rpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts
buyer.stop()
assertCheckpoints(DUMMY_BANK_A_NAME, 1)
val buyer2 = startNode(providedName = DUMMY_BANK_A_NAME, customOverrides = mapOf("p2pAddress" to buyer.p2pAddress.toString())).getOrThrow()
saleFuture.getOrThrow()
assertThat(buyer2.rpc.getCashBalance(USD)).isEqualTo(95.DOLLARS)
assertThat(seller.rpc.getCashBalance(USD)).isEqualTo(5.DOLLARS)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.div
import net.corda.core.internal.times
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.internal.SerializationEnvironment
Expand All @@ -20,6 +22,7 @@ import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.Message
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.NodeHandle
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.internal.createTestSerializationEnv
Expand All @@ -29,12 +32,14 @@ import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
import net.corda.testing.node.testContext
import org.apache.commons.lang.ClassUtils
import org.assertj.core.api.Assertions.assertThat
import org.slf4j.LoggerFactory
import rx.Observable
import rx.subjects.AsyncSubject
import java.io.InputStream
import java.net.Socket
import java.net.SocketException
import java.sql.DriverManager
import java.time.Duration
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -239,6 +244,15 @@ fun CordaRPCOps.waitForShutdown(): Observable<Unit> {
return completable
}

fun DriverDSL.assertCheckpoints(name: CordaX500Name, expected: Long) {
DriverManager.getConnection("jdbc:h2:file:${baseDirectory(name) / "persistence"}", "sa", "").use { connection ->
connection.createStatement().executeQuery("select count(*) from NODE_CHECKPOINTS").use { rs ->
rs.next()
assertThat(rs.getLong(1)).isEqualTo(expected)
}
}
}

/**
* Should only be used by Driver and MockNode.
*/
Expand Down

0 comments on commit e20278f

Please sign in to comment.