Skip to content

Commit

Permalink
ENT-2053: add exclusive flag to created P2P queues (except service on…
Browse files Browse the repository at this point in the history
…es); this is a sync with ENT (corda#3592)
  • Loading branch information
bpaunescu authored Jul 13, 2018
1 parent 2b4865f commit ce787df
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.requireMessageSize
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
Expand Down Expand Up @@ -168,7 +169,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
inboxes += RemoteInboxAddress(it).queueName
}

inboxes.forEach { createQueueIfAbsent(it, producerSession!!) }
inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true) }

p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents)

messagingExecutor = MessagingExecutor(
Expand Down Expand Up @@ -469,14 +471,14 @@ class P2PMessagingClient(val config: NodeConfiguration,
val internalTargetQueue = (address as? ArtemisAddress)?.queueName
?: throw IllegalArgumentException("Not an Artemis address")
state.locked {
createQueueIfAbsent(internalTargetQueue, producerSession!!)
createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = address !is ServiceAddress)
}
internalTargetQueue
}
}

/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
private fun createQueueIfAbsent(queueName: String, session: ClientSession) {
private fun createQueueIfAbsent(queueName: String, session: ClientSession, exclusive: Boolean) {
fun sendBridgeCreateMessage() {
val keyHash = queueName.substring(PEERS_PREFIX.length)
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
Expand All @@ -495,7 +497,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address")
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false,
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), exclusive, null)
sendBridgeCreateMessage()
}
}
Expand Down

0 comments on commit ce787df

Please sign in to comment.