Skip to content

Commit

Permalink
[CORDA-760]: Propagate invocation context across the codebase. (corda…
Browse files Browse the repository at this point in the history
  • Loading branch information
sollecitom authored Nov 15, 2017
1 parent f0a5ea9 commit 92c8861
Show file tree
Hide file tree
Showing 70 changed files with 1,176 additions and 312 deletions.
269 changes: 268 additions & 1 deletion .ci/api-current.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import net.corda.core.contracts.Amount
import net.corda.core.contracts.ContractState
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.keys
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.context.Origin
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineUpdate
Expand Down Expand Up @@ -148,17 +148,17 @@ class NodeMonitorModelTest {
// ISSUE
expect { add: StateMachineUpdate.Added ->
issueSmId = add.id
val initiator = add.stateMachineInfo.initiator
require(initiator is FlowInitiator.RPC && initiator.username == "user1")
val context = add.stateMachineInfo.context()
require(context.origin is Origin.RPC && context.principal().name == "user1")
},
expect { remove: StateMachineUpdate.Removed ->
require(remove.id == issueSmId)
},
// MOVE - N.B. There are other framework flows that happen in parallel for the remote resolve transactions flow
expect(match = { it.stateMachineInfo.flowLogicClassName == CashPaymentFlow::class.java.name }) { add: StateMachineUpdate.Added ->
moveSmId = add.id
val initiator = add.stateMachineInfo.initiator
require(initiator is FlowInitiator.RPC && initiator.username == "user1")
val context = add.stateMachineInfo.context()
require(context.origin is Origin.RPC && context.principal().name == "user1")
},
expect(match = { it is StateMachineUpdate.Removed && it.id == moveSmId }) {
}
Expand All @@ -169,8 +169,8 @@ class NodeMonitorModelTest {
sequence(
// MOVE
expect { add: StateMachineUpdate.Added ->
val initiator = add.stateMachineInfo.initiator
require(initiator is FlowInitiator.Peer && aliceNode.isLegalIdentity(initiator.party))
val context = add.stateMachineInfo.context()
require(context.origin is Origin.Peer && aliceNode.isLegalIdentity(aliceNode.identityFromX500Name((context.origin as Origin.Peer).party)))
}
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.corda.client.rpc

import net.corda.core.context.*
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowInitiator
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.packageName
import net.corda.core.messaging.*
Expand All @@ -20,14 +20,15 @@ import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.nodeapi.User
import net.corda.testing.ALICE
import net.corda.testing.chooseIdentity
import net.corda.testing.*
import net.corda.testing.internal.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
import kotlin.reflect.KClass
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
Expand All @@ -44,8 +45,8 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
private lateinit var client: CordaRPCClient
private var connection: CordaRPCConnection? = null

private fun login(username: String, password: String) {
connection = client.start(username, password)
private fun login(username: String, password: String, externalTrace: Trace? = null, impersonatedActor: Actor? = null) {
connection = client.start(username, password, externalTrace, impersonatedActor)
}

@Before
Expand Down Expand Up @@ -131,31 +132,51 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C

@Test
fun `flow initiator via RPC`() {
login(rpcUser.username, rpcUser.password)
val externalTrace = Trace.newInstance()
val impersonatedActor = Actor(Actor.Id("Mark Dadada"), AuthServiceId("Test"), owningLegalIdentity = BOB.name)
login(rpcUser.username, rpcUser.password, externalTrace, impersonatedActor)
val proxy = connection!!.proxy
var countRpcFlows = 0
var countShellFlows = 0
proxy.stateMachinesFeed().updates.subscribe {
if (it is StateMachineUpdate.Added) {
val initiator = it.stateMachineInfo.initiator
if (initiator is FlowInitiator.RPC)
countRpcFlows++
if (initiator is FlowInitiator.Shell)
countShellFlows++
}
}
val nodeIdentity = node.info.chooseIdentity()
node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), nodeIdentity), FlowInitiator.Shell).flatMap { it.resultFuture }.getOrThrow()
proxy.startFlow(::CashIssueFlow,
123.DOLLARS,
OpaqueBytes.of(0),
nodeIdentity
).returnValue.getOrThrow()
proxy.startFlowDynamic(CashIssueFlow::class.java,
1000.DOLLARS,
OpaqueBytes.of(0),
nodeIdentity).returnValue.getOrThrow()
assertEquals(2, countRpcFlows)
assertEquals(1, countShellFlows)

val updates = proxy.stateMachinesFeed().updates

node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), nodeIdentity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow()
proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), nodeIdentity).returnValue.getOrThrow()
proxy.startFlowDynamic(CashIssueFlow::class.java, 1000.DOLLARS, OpaqueBytes.of(0), nodeIdentity).returnValue.getOrThrow()

val historicalIds = mutableSetOf<Trace.InvocationId>()
var sessionId: Trace.SessionId? = null
updates.expectEvents(isStrict = false) {
sequence(
expect { update: StateMachineUpdate.Added ->
checkShellNotification(update.stateMachineInfo)
},
expect { update: StateMachineUpdate.Added ->
checkRpcNotification(update.stateMachineInfo, rpcUser.username, historicalIds, externalTrace, impersonatedActor)
sessionId = update.stateMachineInfo.context().trace.sessionId
},
expect { update: StateMachineUpdate.Added ->
checkRpcNotification(update.stateMachineInfo, rpcUser.username, historicalIds, externalTrace, impersonatedActor)
assertThat(update.stateMachineInfo.context().trace.sessionId).isEqualTo(sessionId)
}
)
}
}
}

private fun checkShellNotification(info: StateMachineInfo) {

val context = info.context()
assertThat(context.origin).isInstanceOf(Origin.Shell::class.java)
}

private fun checkRpcNotification(info: StateMachineInfo, rpcUsername: String, historicalIds: MutableSet<Trace.InvocationId>, externalTrace: Trace?, impersonatedActor: Actor?) {

val context = info.context()
assertThat(context.origin).isInstanceOf(Origin.RPC::class.java)
assertThat(context.externalTrace).isEqualTo(externalTrace)
assertThat(context.impersonatedActor).isEqualTo(impersonatedActor)
assertThat(context.actor?.id?.value).isEqualTo(rpcUsername)
assertThat(historicalIds).doesNotContain(context.trace.invocationId)
historicalIds.add(context.trace.invocationId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package net.corda.client.rpc

import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.context.Trace
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
Expand Down Expand Up @@ -348,9 +349,10 @@ class RPCStabilityTests {
val message = session.createMessage(false)
val request = RPCApi.ClientToServer.RpcRequest(
clientAddress = SimpleString(myQueue),
id = RPCApi.RpcRequestId(random63BitValue()),
methodName = SlowConsumerRPCOps::streamAtInterval.name,
serialisedArguments = listOf(10.millis, 123456).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT).bytes
serialisedArguments = listOf(10.millis, 123456).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT).bytes,
replyId = Trace.InvocationId.newInstance(),
sessionId = Trace.SessionId.newInstance()
)
request.writeToClientMessage(message)
producer.send(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package net.corda.client.rpc
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
Expand Down Expand Up @@ -99,7 +101,22 @@ class CordaRPCClient @JvmOverloads constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String): CordaRPCConnection {
return CordaRPCConnection(rpcClient.start(CordaRPCOps::class.java, username, password))
return start(username, password, null, null)
}

/**
* Logs in to the target server and returns an active connection. The returned connection is a [java.io.Closeable]
* and can be used with a try-with-resources statement. If you don't use that, you should use the
* [RPCConnection.notifyServerAndClose] or [RPCConnection.forceClose] methods to dispose of the connection object
* when done.
*
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param externalTrace external [Trace] for correlation.
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?): CordaRPCConnection {
return CordaRPCConnection(rpcClient.start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package net.corda.client.rpc.internal

import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.RPCException
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.logElapsedTime
import net.corda.core.internal.uncheckedCast
Expand Down Expand Up @@ -100,7 +102,9 @@ class RPCClient<I : RPCOps>(
fun start(
rpcOpsClass: Class<I>,
username: String,
password: String
password: String,
externalTrace: Trace? = null,
impersonatedActor: Actor? = null
): RPCConnection<I> {
return log.logElapsedTime("Startup") {
val clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.${random63BitValue()}")
Expand All @@ -113,8 +117,8 @@ class RPCClient<I : RPCOps>(
minLargeMessageSize = rpcConfiguration.maxFileSize
isUseGlobalPools = nodeSerializationEnv != null
}

val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, clientAddress, rpcOpsClass, serializationContext)
val sessionId = Trace.SessionId.newInstance()
val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, clientAddress, rpcOpsClass, serializationContext, sessionId, externalTrace, impersonatedActor)
try {
proxyHandler.start()
val ops: I = uncheckedCast(Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler))
Expand Down
Loading

0 comments on commit 92c8861

Please sign in to comment.