Skip to content

Commit

Permalink
Rename the inbox (corda#2360)
Browse files Browse the repository at this point in the history
Add changelog entry

Address review comments

Alternate solution to service queues

Fixup after merge
  • Loading branch information
Matthew Nesbit authored Jan 17, 2018
1 parent d0b62db commit 6edf955
Show file tree
Hide file tree
Showing 16 changed files with 157 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.ArtemisConsumer
import net.corda.nodeapi.ArtemisProducer
import net.corda.nodeapi.RPCApi
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientMessage
Expand Down Expand Up @@ -194,7 +194,7 @@ class RPCClientProxyHandler(
TimeUnit.MILLISECONDS
)
sessionAndProducerPool.run {
it.session.createTemporaryQueue(clientAddress, ActiveMQDefaultConfiguration.getDefaultRoutingType(), clientAddress)
it.session.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
}
val sessionFactory = serverLocator.createSessionFactory()
val session = sessionFactory.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
Expand Down
7 changes: 7 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ UNRELEASED
* Peer-to-peer communications is now via AMQP 1.0 as default.
Although the legacy Artemis CORE bridging can still be used by setting the ``useAMQPBridges`` configuration property to false.

* The Artemis topics used for peer-to-peer communication have been changed to be more consistent with future cryptographic
agility and to open up the future possibility of sharing brokers between nodes. This is a breaking wire level change
as it means that nodes after this change will not be able to communicate correctly with nodes running the previous version.
Also, any pending enqueued messages in the Artemis message store will not be delivered correctly to their original target.
However, assuming a clean reset of the artemis data and that the nodes are consistent versions,
data persisted via the AMQP serializer will be forward compatible.

.. _changelog_v1:

Release 1.0
Expand Down
6 changes: 3 additions & 3 deletions docs/source/messaging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ Message queues
The node makes use of various queues for its operation. The more important ones are described below. Others are used
for maintenance and other minor purposes.

:``p2p.inbound``:
:``p2p.inbound.$identity``:
The node listens for messages sent from other peer nodes on this queue. Only clients who are authenticated to be
nodes on the same network are given permission to send. Messages which are routed internally are also sent to this
queue (e.g. two flows on the same node communicating with each other).

:``internal.peers.$identity``:
These are a set of private queues only available to the node which it uses to route messages destined to other peers.
The queue name ends in the base 58 encoding of the peer's identity key. There is at most one queue per peer. The broker
creates a bridge from this queue to the peer's ``p2p.inbound`` queue, using the network map service to lookup the
creates a bridge from this queue to the peer's ``p2p.inbound.$identity`` queue, using the network map service to lookup the
peer's network address.

:``internal.services.$identity``:
Expand Down Expand Up @@ -86,7 +86,7 @@ Clients attempting to connect to the node's broker fall in one of four groups:
#. Anyone connecting with the username ``SystemUsers/Peer`` is treated as a peer on the same Corda network as the node. Their
TLS root CA must be the same as the node's root CA - the root CA is the doorman of the network and having the same root CA
implies we've been let in by the same doorman. If they are part of the same network then they are only given permission
to send to our ``p2p.inbound`` queue, otherwise they are rejected.
to send to our ``p2p.inbound.$identity`` queue, otherwise they are rejected.

#. Every other username is treated as a RPC user and authenticated against the node's list of valid RPC users. If that
is successful then they are only given sufficient permission to perform RPC, otherwise they are rejected.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package net.corda.nodeapi.internal

import net.corda.core.crypto.toStringShort
import net.corda.core.identity.Party
import net.corda.core.messaging.MessageRecipientGroup
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.toBase58String
import java.security.PublicKey

/**
Expand All @@ -23,7 +24,7 @@ class ArtemisMessagingComponent {
const val PEER_USER = "SystemUsers/Peer"
const val INTERNAL_PREFIX = "internal."
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue
const val P2P_QUEUE = "p2p.inbound"
const val P2P_PREFIX = "p2p.inbound."
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
}

Expand All @@ -49,7 +50,7 @@ class ArtemisMessagingComponent {
@CordaSerializable
data class NodeAddress(override val queueName: String, override val hostAndPort: NetworkHostAndPort) : ArtemisPeerAddress {
constructor(peerIdentity: PublicKey, hostAndPort: NetworkHostAndPort) :
this("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort)
this("$PEERS_PREFIX${peerIdentity.toStringShort()}", hostAndPort)
}

/**
Expand All @@ -62,6 +63,30 @@ class ArtemisMessagingComponent {
* @param identity The service identity's owning key.
*/
data class ServiceAddress(val identity: PublicKey) : ArtemisAddress, MessageRecipientGroup {
override val queueName: String = "$PEERS_PREFIX${identity.toBase58String()}"
override val queueName: String = "$PEERS_PREFIX${identity.toStringShort()}"
}

/**
* [RemoteInboxAddress] implements [SingleMessageRecipient]. It represents the non-local address of a remote inbox.
* @param identity The Node public identity
*/
data class RemoteInboxAddress(val identity: PublicKey) : ArtemisAddress, SingleMessageRecipient {
constructor(party: Party) : this(party.owningKey)

companion object {
/**
* When transferring a message from the local holding queue to the remote inbox queue
* this method provides a simple translation of the address string.
* The topics are distinct so that proper segregation of internal
* and external access permissions can be made.
*/
fun translateLocalQueueToInboxAddress(address: String): String {
require(address.startsWith(PEERS_PREFIX)) { "Failed to map address: $address to a remote topic as it is not in the $PEERS_PREFIX namespace" }
return P2P_PREFIX + address.substring(PEERS_PREFIX.length)
}
}

override val queueName: String = "$P2P_PREFIX${identity.toStringShort()}"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package net.corda.node.amqp
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.toStringShort
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.toBase58String
import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.*
import net.corda.node.services.messaging.ArtemisMessagingClient
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.crypto.loadKeyStore
import net.corda.testing.*
import net.corda.testing.internal.rigorousMock
Expand Down Expand Up @@ -50,7 +50,7 @@ class AMQPBridgeTest {
@Test
fun `test acked and nacked messages`() {
// Create local queue
val sourceQueueName = "internal.peers." + BOB.publicKey.toBase58String()
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
val (artemisServer, artemisClient) = createArtemis(sourceQueueName)

// Pre-populate local queue with 3 messages
Expand Down Expand Up @@ -130,11 +130,13 @@ class AMQPBridgeTest {
@Test
fun `Test legacy bridge still works`() {
// Create local queue
val sourceQueueName = "internal.peers." + ALICE.publicKey.toBase58String()
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
val (artemisLegacyServer, artemisLegacyClient) = createLegacyArtemis(sourceQueueName)


val (artemisServer, artemisClient) = createArtemis(null)
val inbox = ArtemisMessagingComponent.RemoteInboxAddress(BOB.party).queueName
artemisClient.started!!.session.createQueue(inbox, RoutingType.ANYCAST, inbox, true)

val artemis = artemisLegacyClient.started!!
for (i in 0 until 3) {
Expand All @@ -147,8 +149,7 @@ class AMQPBridgeTest {
artemis.producer.send(sourceQueueName, artemisMessage)
}


val subs = artemisClient.started!!.session.createConsumer(P2P_QUEUE)
val subs = artemisClient.started!!.session.createConsumer(inbox)
for (i in 0 until 3) {
val msg = subs.receive()
val messageBody = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
Expand All @@ -174,9 +175,9 @@ class AMQPBridgeTest {
doReturn(true).whenever(it).useAMQPBridges
}
artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCache>().also {
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any())
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
}
val userService = rigorousMock<RPCSecurityManager>()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE)
Expand All @@ -186,7 +187,7 @@ class AMQPBridgeTest {
val artemis = artemisClient.started!!
if (sourceQueueName != null) {
// Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true)
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
}
return Pair(artemisServer, artemisClient)
}
Expand All @@ -203,9 +204,9 @@ class AMQPBridgeTest {
doReturn(ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))).whenever(it).activeMQServer
}
artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCache>().also {
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any())
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
}
val userService = rigorousMock<RPCSecurityManager>()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, null, networkMap, userService, MAX_MESSAGE_SIZE)
Expand All @@ -214,7 +215,7 @@ class AMQPBridgeTest {
artemisClient.start()
val artemis = artemisClient.started!!
// Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true)
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
return Pair(artemisServer, artemisClient)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.netty.AMQPClient
import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingClient
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.crypto.loadKeyStore
import net.corda.testing.*
Expand Down Expand Up @@ -48,7 +50,7 @@ class ProtonWrapperTests {
amqpServer.start()
val receiveSubs = amqpServer.onReceive.subscribe {
assertEquals(BOB_NAME.toString(), it.sourceLegalName)
assertEquals("p2p.inbound", it.topic)
assertEquals(P2P_PREFIX + "Test", it.topic)
assertEquals("Test", String(it.payload))
it.complete(true)
}
Expand All @@ -64,7 +66,7 @@ class ProtonWrapperTests {
assertEquals(true, clientConnect.connected)
assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal))
val msg = amqpClient.createMessage("Test".toByteArray(),
"p2p.inbound",
P2P_PREFIX + "Test",
ALICE_NAME.toString(),
emptyMap())
amqpClient.write(msg)
Expand Down Expand Up @@ -151,8 +153,8 @@ class ProtonWrapperTests {
assertEquals(true, clientConnected.get().connected)
assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal))
val artemis = artemisClient.started!!
val sendAddress = "p2p.inbound"
artemis.session.createQueue(sendAddress, RoutingType.MULTICAST, "queue", true)
val sendAddress = P2P_PREFIX + "Test"
artemis.session.createQueue(sendAddress, RoutingType.ANYCAST, "queue", true)
val consumer = artemis.session.createConsumer("queue")
val testData = "Test".toByteArray()
val testProperty = mutableMapOf<Any?, Any?>()
Expand Down Expand Up @@ -230,7 +232,7 @@ class ProtonWrapperTests {
}
artemisConfig.configureWithDevSSLCertificate()

val networkMap = rigorousMock<NetworkMapCache>().also {
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
doReturn(never<NetworkMapCache.MapChange>()).whenever(it).changed
}
val userService = rigorousMock<RPCSecurityManager>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.random63BitValue
import net.corda.core.crypto.toStringShort
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
Expand All @@ -14,14 +15,13 @@ import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.toBase58String
import net.corda.core.utilities.unwrap
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.testing.ALICE_NAME
Expand Down Expand Up @@ -71,30 +71,30 @@ abstract class MQSecurityTest : NodeBasedTest() {

@Test
fun `consume message from P2P queue`() {
assertConsumeAttackFails(P2P_QUEUE)
assertConsumeAttackFails("$P2P_PREFIX${alice.info.chooseIdentity().owningKey.toStringShort()}")
}

@Test
fun `consume message from peer queue`() {
val bobParty = startBobAndCommunicateWithAlice()
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
}

@Test
fun `send message to address of peer which has been communicated with`() {
val bobParty = startBobAndCommunicateWithAlice()
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
}

@Test
fun `create queue for peer which has not been communicated with`() {
val bob = startNode(BOB_NAME)
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toBase58String()}")
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toStringShort()}")
}

@Test
fun `create queue for unknown peer`() {
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toBase58String()}"
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toStringShort()}"
assertAllQueueCreationAttacksFail(invalidPeerQueue)
}

Expand Down
4 changes: 4 additions & 0 deletions node/src/main/kotlin/net/corda/node/internal/Node.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Scheduler
import rx.schedulers.Schedulers
import java.security.PublicKey
import java.time.Clock
import java.util.concurrent.atomic.AtomicInteger
import javax.management.ObjectName
Expand Down Expand Up @@ -166,11 +167,14 @@ open class Node(configuration: NodeConfiguration,
VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, networkParameters.maxMessageSize)
VerifierType.InMemory -> null
}
require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
val serviceIdentity: PublicKey? = if (info.legalIdentities.size == 1) null else info.legalIdentities[1].owningKey
return P2PMessagingClient(
configuration,
versionInfo,
serverAddress,
info.legalIdentities[0].owningKey,
serviceIdentity,
serverThread,
database,
advertisedAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {

fun getNodeByHash(nodeHash: SecureHash): NodeInfo?

/** Find nodes from the [PublicKey] toShortString representation.
* This is used for Artemis bridge lookup process. */
fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo>

/** Adds a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo)

Expand Down
Loading

0 comments on commit 6edf955

Please sign in to comment.