@@ -11,13 +11,13 @@ import net.corda.nodeapi.internal.ArtemisMessagingClient
11
11
import net.corda.nodeapi.internal.ArtemisMessagingComponent
12
12
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
13
13
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
14
- import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
15
14
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
16
15
import net.corda.nodeapi.internal.ArtemisSessionProvider
17
16
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
18
17
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
19
18
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
20
19
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
20
+ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
21
21
import org.apache.activemq.artemis.api.core.SimpleString
22
22
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
23
23
import org.apache.activemq.artemis.api.core.client.ClientConsumer
@@ -37,16 +37,24 @@ import kotlin.concurrent.withLock
37
37
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
38
38
*/
39
39
@VisibleForTesting
40
- class AMQPBridgeManager (config : NodeSSLConfiguration , private val maxMessageSize : Int , private val artemisMessageClientFactory : () -> ArtemisSessionProvider ) : BridgeManager {
40
+ class AMQPBridgeManager (config : NodeSSLConfiguration , maxMessageSize : Int , private val artemisMessageClientFactory : () -> ArtemisSessionProvider ) : BridgeManager {
41
41
42
42
private val lock = ReentrantLock ()
43
43
private val bridgeNameToBridgeMap = mutableMapOf<String , AMQPBridge >()
44
+
45
+ private class AMQPConfigurationImpl private constructor(override val keyStore : KeyStore ,
46
+ override val keyStorePrivateKeyPassword : CharArray ,
47
+ override val trustStore : KeyStore ,
48
+ override val maxMessageSize : Int ) : AMQPConfiguration {
49
+ constructor (config: NodeSSLConfiguration , maxMessageSize: Int ) : this (config.loadSslKeyStore().internal,
50
+ config.keyStorePassword.toCharArray(),
51
+ config.loadTrustStore().internal,
52
+ maxMessageSize)
53
+ }
54
+
55
+ private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl (config, maxMessageSize)
44
56
private var sharedEventLoopGroup: EventLoopGroup ? = null
45
- private val keyStore = config.loadSslKeyStore().internal
46
- private val keyStorePrivateKeyPassword: String = config.keyStorePassword
47
- private val trustStore = config.loadTrustStore().internal
48
57
private var artemis: ArtemisSessionProvider ? = null
49
- private val crlCheckSoftFail: Boolean = config.crlCheckSoftFail
50
58
51
59
constructor (config: NodeSSLConfiguration , p2pAddress: NetworkHostAndPort , maxMessageSize: Int ) : this (config, maxMessageSize, { ArtemisMessagingClient (config, p2pAddress, maxMessageSize) })
52
60
@@ -65,26 +73,26 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
65
73
private class AMQPBridge (private val queueName : String ,
66
74
private val target : NetworkHostAndPort ,
67
75
private val legalNames : Set <CordaX500Name >,
68
- keyStore : KeyStore ,
69
- keyStorePrivateKeyPassword : String ,
70
- trustStore : KeyStore ,
71
- crlCheckSoftFail : Boolean ,
76
+ private val amqpConfig : AMQPConfiguration ,
72
77
sharedEventGroup : EventLoopGroup ,
73
- private val artemis : ArtemisSessionProvider ,
74
- private val maxMessageSize : Int ) {
78
+ private val artemis : ArtemisSessionProvider ) {
75
79
companion object {
76
80
fun getBridgeName (queueName : String , hostAndPort : NetworkHostAndPort ): String = " $queueName -> $hostAndPort "
77
81
private val log = contextLogger()
78
82
}
79
83
80
84
private fun withMDC (block : () -> Unit ) {
81
- MDC .put(" queueName" , queueName)
82
- MDC .put(" target" , target.toString())
83
- MDC .put(" bridgeName" , bridgeName)
84
- MDC .put(" legalNames" , legalNames.joinToString(separator = " ;" ) { it.toString() })
85
- MDC .put(" maxMessageSize" , maxMessageSize.toString())
86
- block()
87
- MDC .clear()
85
+ val oldMDC = MDC .getCopyOfContextMap()
86
+ try {
87
+ MDC .put(" queueName" , queueName)
88
+ MDC .put(" target" , target.toString())
89
+ MDC .put(" bridgeName" , bridgeName)
90
+ MDC .put(" legalNames" , legalNames.joinToString(separator = " ;" ) { it.toString() })
91
+ MDC .put(" maxMessageSize" , amqpConfig.maxMessageSize.toString())
92
+ block()
93
+ } finally {
94
+ MDC .setContextMap(oldMDC)
95
+ }
88
96
}
89
97
90
98
private fun logDebugWithMDC (msg : () -> String ) {
@@ -97,7 +105,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
97
105
98
106
private fun logWarnWithMDC (msg : String ) = withMDC { log.warn(msg) }
99
107
100
- val amqpClient = AMQPClient (listOf (target), legalNames, PEER_USER , PEER_USER , keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedThreadPool = sharedEventGroup, maxMessageSize = maxMessageSize )
108
+ val amqpClient = AMQPClient (listOf (target), legalNames, amqpConfig, sharedThreadPool = sharedEventGroup)
101
109
val bridgeName: String get() = getBridgeName(queueName, target)
102
110
private val lock = ReentrantLock () // lock to serialise session level access
103
111
private var session: ClientSession ? = null
@@ -149,8 +157,8 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
149
157
}
150
158
151
159
private fun clientArtemisMessageHandler (artemisMessage : ClientMessage ) {
152
- if (artemisMessage.bodySize > maxMessageSize) {
153
- logWarnWithMDC(" Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize ], message size: [${artemisMessage.bodySize} ], " +
160
+ if (artemisMessage.bodySize > amqpConfig. maxMessageSize) {
161
+ logWarnWithMDC(" Message exceeds maxMessageSize network parameter, maxMessageSize: [${amqpConfig. maxMessageSize} ], message size: [${artemisMessage.bodySize} ], " +
154
162
" dropping message, uuid: ${artemisMessage.getObjectProperty(" _AMQ_DUPL_ID" )} " )
155
163
// Ack the message to prevent same message being sent to us again.
156
164
artemisMessage.acknowledge()
@@ -198,7 +206,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
198
206
if (bridgeExists(getBridgeName(queueName, target))) {
199
207
return
200
208
}
201
- val newBridge = AMQPBridge (queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedEventLoopGroup!! , artemis!! , maxMessageSize )
209
+ val newBridge = AMQPBridge (queueName, target, legalNames, amqpConfig, sharedEventLoopGroup!! , artemis!! )
202
210
lock.withLock {
203
211
bridgeNameToBridgeMap[newBridge.bridgeName] = newBridge
204
212
}
0 commit comments