Skip to content

Commit

Permalink
CORDA-2520: Add FetchParametersFlow (corda#4674)
Browse files Browse the repository at this point in the history
* CORDA-2520: Add FetchParametersFlow

* Address comments, add test
  • Loading branch information
kasiastreich authored Jan 29, 2019
1 parent 88e4b85 commit 6efd54f
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 44 deletions.
7 changes: 2 additions & 5 deletions core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.identity.Party
import net.corda.core.internal.BackpressureAwareTimedFlow
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.NetworkParametersServiceInternal
import net.corda.core.internal.*
import net.corda.core.internal.notary.generateSignature
import net.corda.core.internal.notary.validateSignatures
import net.corda.core.internal.pushToLoggingContext
import net.corda.core.transactions.*
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData
Expand Down Expand Up @@ -107,7 +104,7 @@ class NotaryFlow {
check(stx.coreTransaction is NotaryChangeWireTransaction) {
"Notary $notaryParty is not on the network parameter whitelist. A non-whitelisted notary can only be used for notary change transactions"
}
val historicNotary = (serviceHub.networkParametersService as NetworkParametersServiceInternal).getHistoricNotary(notaryParty)
val historicNotary = (serviceHub.networkParametersService as NetworkParametersStorage).getHistoricNotary(notaryParty)
?: throw IllegalStateException("The notary party $notaryParty specified by transaction ${stx.id}, is not recognised as a current or historic notary.")
historicNotary.validating

Expand Down
10 changes: 6 additions & 4 deletions core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.RetrieveAnyTransactionPayload
import net.corda.core.internal.readFully
import net.corda.core.internal.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.unwrap

Expand Down Expand Up @@ -42,7 +40,7 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)

@Suspendable
override fun call(): Void? {
// The first payload will be the transaction data, subsequent payload will be the transaction/attachment data.
// The first payload will be the transaction data, subsequent payload will be the transaction/attachment/network parameters data.
var payload = payload

// Depending on who called this flow, the type of the initial payload is different.
Expand Down Expand Up @@ -93,6 +91,10 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
serviceHub.attachments.openAttachment(it)?.open()?.readFully()
?: throw FetchDataFlow.HashNotFound(it)
}
FetchDataFlow.DataType.PARAMETERS -> dataRequest.hashes.map {
(serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(it)
?: throw FetchDataFlow.MissingNetworkParameters(it)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package net.corda.core.internal

import net.corda.core.contracts.NamedByHash
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.verify
import net.corda.core.serialization.CordaSerializable
Expand Down Expand Up @@ -43,7 +45,9 @@ Cert path: $fullCertPath

/** Similar to [SignedData] but instead of just attaching the public key, the certificate for the key is attached instead. */
@CordaSerializable
class SignedDataWithCert<T : Any>(val raw: SerializedBytes<T>, val sig: DigitalSignatureWithCert) {
class SignedDataWithCert<T : Any>(val raw: SerializedBytes<T>, val sig: DigitalSignatureWithCert): NamedByHash {
override val id: SecureHash get () = raw.hash

fun verified(): T {
sig.verify(raw)
return uncheckedCast(raw.deserialize<Any>())
Expand Down
38 changes: 33 additions & 5 deletions core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch
import net.corda.core.internal.FetchDataFlow.HashNotFound
import net.corda.core.node.NetworkParameters
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationToken
import net.corda.core.serialization.SerializeAsToken
Expand Down Expand Up @@ -51,6 +52,8 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(

class HashNotFound(val requested: SecureHash) : FlowException()

class MissingNetworkParameters(val requested: SecureHash) : FlowException("Failed to fetch network parameters with hash: $requested")

class IllegalTransactionRequest(val requested: SecureHash) : FlowException("Illegal attempt to request a transaction (${requested}) that is not in the transitive dependency graph of the sent transaction.")

@CordaSerializable
Expand All @@ -64,11 +67,11 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(

@CordaSerializable
enum class DataType {
TRANSACTION, ATTACHMENT
TRANSACTION, ATTACHMENT, PARAMETERS
}

@Suspendable
@Throws(HashNotFound::class)
@Throws(HashNotFound::class, MissingNetworkParameters::class)
override fun call(): Result<T> {
// Load the items we have from disk and figure out which we're missing.
val (fromDisk, toFetch) = loadWhatWeHave()
Expand Down Expand Up @@ -139,7 +142,7 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
}

/**
* Given a set of hashes either loads from from local storage or requests them from the other peer. Downloaded
* Given a set of hashes either loads from local storage or requests them from the other peer. Downloaded
* attachments are saved to local storage automatically.
*/
class FetchAttachmentsFlow(requests: Set<SecureHash>,
Expand All @@ -158,10 +161,10 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
} catch (e: FileAlreadyExistsException) {
// This can happen when another transaction will insert the same attachment during this transaction.
// The outcome is the same (the attachment is imported), so we can ignore this exception.
logger.debug("Attachment ${attachment.id} already inserted.")
logger.debug { "Attachment ${attachment.id} already inserted." }
}
} else {
logger.debug("Attachment ${attachment.id} already exists, skipping.")
logger.debug { "Attachment ${attachment.id} already exists, skipping." }
}
}
}
Expand Down Expand Up @@ -193,3 +196,28 @@ class FetchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :

override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
}

/**
* Given a set of hashes either loads from local network parameters storage or requests them from the other peer. Downloaded
* network parameters are saved to local parameters storage automatically. This flow can be used only if the minimumPlatformVersion is >= 4.
* Nodes on lower versions won't respond to this flow.
*/
class FetchNetworkParametersFlow(requests: Set<SecureHash>,
otherSide: FlowSession) : FetchDataFlow<SignedDataWithCert<NetworkParameters>, SignedDataWithCert<NetworkParameters>>(requests, otherSide, DataType.PARAMETERS) {
override fun load(txid: SecureHash): SignedDataWithCert<NetworkParameters>? {
return (serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(txid)
}

override fun maybeWriteToDisk(downloaded: List<SignedDataWithCert<NetworkParameters>>) {
for (parameters in downloaded) {
with(serviceHub.networkParametersService as NetworkParametersStorage) {
if (!hasParameters(parameters.id)) {
// This will perform the signature check too and throws SignatureVerificationException
saveParameters(parameters)
} else {
logger.debug { "Network parameters ${parameters.id} already exists in storage, skipping." }
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,44 @@
package net.corda.core.internal

import net.corda.core.crypto.SecureHash
import net.corda.core.identity.Party
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.NetworkParametersService
import java.security.cert.X509Certificate

interface NetworkParametersStorage : NetworkParametersService {
/**
* Return parameters epoch for the given parameters hash. Null if there are no parameters for this hash in the storage and we are unable to
* get them from network map.
*/
fun getEpochFromHash(hash: SecureHash): Int?

/**
* Return signed network parameters with certificate for the given hash. Null if there are no parameters for this hash in the storage.
* (No fallback to network map.)
*/
fun lookupSigned(hash: SecureHash): SignedDataWithCert<NetworkParameters>?

/**
* Checks if parameters with given hash are in the storage.
*/
fun hasParameters(hash: SecureHash): Boolean

interface NetworkParametersServiceInternal : NetworkParametersService {
/**
* Returns the [NotaryInfo] for a notary [party] in the current or any historic network parameter whitelist, or null if not found.
*/
fun getHistoricNotary(party: Party): NotaryInfo?

/**
* Save signed network parameters data. Internally network parameters bytes should be stored with the signature.
* It's because of ability of older nodes to function in network where parameters were extended with new fields.
* Hash should always be calculated over the serialized bytes.
*/
fun saveParameters(signedNetworkParameters: SignedDataWithCert<NetworkParameters>)

/**
* Set information that given parameters are current parameters for the network.
*/
fun setCurrentParameters(currentSignedParameters: SignedDataWithCert<NetworkParameters>, trustRoot: X509Certificate)
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
@Suspendable
@Throws(FetchDataFlow.HashNotFound::class, FetchDataFlow.IllegalTransactionRequest::class)
override fun call() {
val counterpartyPlatformVersion = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSide.counterparty)?.platformVersion ?:
throw FlowException("Couldn't retrieve party's ${otherSide.counterparty} platform version from NetworkMapCache")
val newTxns = ArrayList<SignedTransaction>(txHashes.size)
// Start fetching data.
for (pageNumber in 0..(txHashes.size - 1) / RESOLUTION_PAGE_SIZE) {
Expand All @@ -85,6 +87,11 @@ class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
newTxns += downloadDependencies(page)
val txsWithMissingAttachments = if (pageNumber == 0) signedTransaction?.let { newTxns + it } ?: newTxns else newTxns
fetchMissingAttachments(txsWithMissingAttachments)
// Fetch missing parameters flow was added in version 4. This check is needed so we don't end up with node V4 sending parameters
// request to node V3 that doesn't know about this protocol.
if (counterpartyPlatformVersion >= 4) {
fetchMissingParameters(txsWithMissingAttachments)
}
}
otherSide.send(FetchDataFlow.Request.End)
// Finish fetching data.
Expand Down Expand Up @@ -180,4 +187,13 @@ class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
if (missingAttachments.isNotEmpty())
subFlow(FetchAttachmentsFlow(missingAttachments.toSet(), otherSide))
}

// TODO This can also be done in parallel. See comment to [fetchMissingAttachments] above.
@Suspendable
private fun fetchMissingParameters(downloads: List<SignedTransaction>) {
val parameters = downloads.mapNotNull { it.networkParametersHash }
val missingParameters = parameters.filter { !(serviceHub.networkParametersService as NetworkParametersStorage).hasParameters(it) }
if (missingParameters.isNotEmpty())
subFlow(FetchNetworkParametersFlow(missingParameters.toSet(), otherSide))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package net.corda.core.internal

import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SerializationFactory
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.internal.createDevNetworkMapCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.StartedMockNode
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.cordappForClasses
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test

class NetworkParametersResolutionTest {
private lateinit var params2: NetworkParameters
private val certKeyPair: CertificateAndKeyPair = createDevNetworkMapCa()
private lateinit var mockNet: MockNetwork
private lateinit var notaryNode: StartedMockNode
private lateinit var megaCorpNode: StartedMockNode
private lateinit var miniCorpNode: StartedMockNode
private lateinit var megaCorpParty: Party
private lateinit var miniCorpParty: Party
private lateinit var notaryParty: Party

@Before
fun setup() {
mockNet = MockNetwork(MockNetworkParameters(
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP, cordappForClasses(ResolveTransactionsFlowTest.TestFlow::class.java, ResolveTransactionsFlowTest.TestResponseFlow::class.java))))
notaryNode = mockNet.defaultNotaryNode
megaCorpNode = mockNet.createPartyNode(CordaX500Name("MegaCorp", "London", "GB"))
miniCorpNode = mockNet.createPartyNode(CordaX500Name("MiniCorp", "London", "GB"))
notaryParty = mockNet.defaultNotaryIdentity
megaCorpParty = megaCorpNode.info.singleIdentity()
miniCorpParty = miniCorpNode.info.singleIdentity()
params2 = testNetworkParameters(epoch = 2, minimumPlatformVersion = 3, notaries = listOf((NotaryInfo(notaryParty, true))))
}

@After
fun tearDown() {
mockNet.stopNodes()
}

// This function is resolving and signing WireTransaction with special parameters.
private fun TransactionBuilder.toSignedTransactionWithParameters(parameters: NetworkParameters?, services: ServiceHub): SignedTransaction {
val wtx = toWireTransaction(services)
val wtxWithHash = SerializationFactory.defaultFactory.withCurrentContext(null) {
WireTransaction(
createComponentGroups(
wtx.inputs,
wtx.outputs,
wtx.commands,
wtx.attachments,
wtx.notary,
wtx.timeWindow,
wtx.references,
parameters?.serialize()?.hash),
wtx.privacySalt
)
}
val publicKey = services.myInfo.singleIdentity().owningKey
val signatureMetadata = SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(publicKey).schemeNumberID)
val signableData = SignableData(wtxWithHash.id, signatureMetadata)
val sig = services.keyManagementService.sign(signableData, publicKey)
return SignedTransaction(wtxWithHash, listOf(sig))
}

// Similar to ResolveTransactionsFlowTest but creates transactions with given network parameters
// First transaction in pair is dependency of the second one.
private fun makeTransactions(parameters1: NetworkParameters?, parameters2: NetworkParameters?): Pair<SignedTransaction, SignedTransaction> {
// Make a chain of custody of dummy states and insert into node A.
val dummy1: SignedTransaction = DummyContract.generateInitial(0, notaryParty, megaCorpParty.ref(1)).let {
val ptx = it.toSignedTransactionWithParameters(parameters1, megaCorpNode.services)
notaryNode.services.addSignature(ptx, notaryParty.owningKey)
}
val dummy2: SignedTransaction = DummyContract.move(dummy1.tx.outRef(0), miniCorpParty).let {
val ptx = it.toSignedTransactionWithParameters(parameters2, megaCorpNode.services)
notaryNode.services.addSignature(ptx, notaryParty.owningKey)
}
megaCorpNode.transaction {
megaCorpNode.services.recordTransactions(dummy1, dummy2)
// Record parameters too.
with(megaCorpNode.services.networkParametersService as NetworkParametersStorage) {
parameters1?.let { saveParameters(certKeyPair.sign(it)) }
parameters2?.let { saveParameters(certKeyPair.sign(it)) }
}
}
return Pair(dummy1, dummy2)
}

@Test
fun `request parameters that are not in the storage`() {
val params1 = miniCorpNode.services.networkParameters
val hash1 = params1.serialize().hash
val hash2 = params2.serialize().hash
// Create two transactions on megaCorpNode
val (stx1, stx2) = makeTransactions(params1, params2)
assertThat(stx1.networkParametersHash).isEqualTo(hash1)
assertThat(stx2.networkParametersHash).isEqualTo(hash2)
miniCorpNode.transaction {
assertThat(miniCorpNode.services.networkParametersService.lookup(hash1)).isNotNull()
assertThat(miniCorpNode.services.networkParametersService.lookup(hash2)).isNull()
}
// miniCorpNode resolves the stx2 from megaCorpParty
val p = ResolveTransactionsFlowTest.TestFlow(setOf(stx2.id), megaCorpParty)
val future = miniCorpNode.startFlow(p)
mockNet.runNetwork()
future.getOrThrow()
miniCorpNode.transaction {
// Check that parameters were downloaded to the storage.
assertThat(miniCorpNode.services.networkParametersService.lookup(hash1)).isEqualTo(params1)
assertThat(miniCorpNode.services.networkParametersService.lookup(hash2)).isEqualTo(params2)
}
}
}
Loading

0 comments on commit 6efd54f

Please sign in to comment.