Skip to content

Commit ed3944c

Browse files
author
Thomas Schroeter
authored
Create Artemis p2p.inbound addresses before starting the broker (corda#3407)
1 parent 4ff5aa3 commit ed3944c

File tree

2 files changed

+47
-23
lines changed

2 files changed

+47
-23
lines changed

node/src/main/kotlin/net/corda/node/internal/Node.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ open class Node(configuration: NodeConfiguration,
192192

193193
if (!configuration.messagingServerExternal) {
194194
val brokerBindAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port)
195-
messageBroker = ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize)
195+
messageBroker = ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize, info.legalIdentities.map { it.owningKey })
196196
}
197197

198198
val serverAddress = configuration.messagingServerAddress

node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt

+46-22
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@ import net.corda.node.services.config.NodeConfiguration
1313
import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pAcceptorTcpTransport
1414
import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor
1515
import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor
16+
import net.corda.nodeapi.internal.ArtemisMessagingComponent
1617
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
1718
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
1819
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
1920
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
2021
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
22+
import org.apache.activemq.artemis.api.core.RoutingType
2123
import org.apache.activemq.artemis.api.core.SimpleString
2224
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
2325
import org.apache.activemq.artemis.core.config.Configuration
26+
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration
27+
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
2428
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
2529
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
2630
import org.apache.activemq.artemis.core.security.Role
@@ -29,6 +33,7 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
2933
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
3034
import java.io.IOException
3135
import java.security.KeyStoreException
36+
import java.security.PublicKey
3237
import javax.annotation.concurrent.ThreadSafe
3338
import javax.security.auth.login.AppConfigurationEntry
3439
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED
@@ -49,7 +54,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
4954
@ThreadSafe
5055
class ArtemisMessagingServer(private val config: NodeConfiguration,
5156
private val messagingServerAddress: NetworkHostAndPort,
52-
private val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() {
57+
private val maxMessageSize: Int,
58+
private val identities: List<PublicKey> = emptyList()) : ArtemisBroker, SingletonSerializeAsToken() {
5359
companion object {
5460
private val log = contextLogger()
5561
}
@@ -105,29 +111,47 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
105111
log.info("P2P messaging server listening on $messagingServerAddress")
106112
}
107113

108-
private fun createArtemisConfig() = SecureArtemisConfiguration().apply {
109-
val artemisDir = config.baseDirectory / "artemis"
110-
bindingsDirectory = (artemisDir / "bindings").toString()
111-
journalDirectory = (artemisDir / "journal").toString()
112-
largeMessagesDirectory = (artemisDir / "large-messages").toString()
113-
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config))
114-
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
115-
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
116-
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
117-
isPersistIDCache = true
118-
isPopulateValidatedUser = true
119-
journalBufferSize_NIO = maxMessageSize + JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
120-
journalBufferSize_AIO = maxMessageSize + JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
121-
journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB.
122-
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
123-
124-
// JMX enablement
125-
if (config.jmxMonitoringHttpPort != null) {
126-
isJMXManagementEnabled = true
127-
isJMXUseBrokerName = true
114+
private fun createArtemisConfig(): Configuration {
115+
val addressConfigs = identities.map {
116+
val queueName = ArtemisMessagingComponent.RemoteInboxAddress(it).queueName
117+
log.info("Configuring address $queueName")
118+
val queueConfig = CoreQueueConfiguration().apply {
119+
address = queueName
120+
name = queueName
121+
routingType = RoutingType.ANYCAST
122+
isExclusive = true
123+
}
124+
CoreAddressConfiguration().apply {
125+
name = queueName
126+
queueConfigurations = listOf(queueConfig)
127+
addRoutingType(RoutingType.ANYCAST)
128+
}
128129
}
130+
return SecureArtemisConfiguration().apply {
131+
val artemisDir = config.baseDirectory / "artemis"
132+
bindingsDirectory = (artemisDir / "bindings").toString()
133+
journalDirectory = (artemisDir / "journal").toString()
134+
largeMessagesDirectory = (artemisDir / "large-messages").toString()
135+
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config))
136+
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
137+
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
138+
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
139+
isPersistIDCache = true
140+
isPopulateValidatedUser = true
141+
journalBufferSize_NIO = maxMessageSize + JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
142+
journalBufferSize_AIO = maxMessageSize + JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
143+
journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB.
144+
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
145+
addressConfigurations = addressConfigs
146+
147+
// JMX enablement
148+
if (config.jmxMonitoringHttpPort != null) {
149+
isJMXManagementEnabled = true
150+
isJMXUseBrokerName = true
151+
}
129152

130-
}.configureAddressSecurity()
153+
}.configureAddressSecurity()
154+
}
131155

132156
/**
133157
* Authenticated clients connecting to us fall in one of the following groups:

0 commit comments

Comments
 (0)