Skip to content

Commit

Permalink
step2
Browse files Browse the repository at this point in the history
  • Loading branch information
Heiko Seeberger committed Nov 16, 2016
1 parent 81edbf7 commit d5a976f
Show file tree
Hide file tree
Showing 19 changed files with 261 additions and 228 deletions.
72 changes: 39 additions & 33 deletions amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,27 @@ import akka.util.ByteString
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client._

final case class OutgoingMessage(bytes: ByteString, immediate: Boolean, mandatory: Boolean, props: Option[BasicProperties])
final case class OutgoingMessage(bytes: ByteString,
immediate: Boolean,
mandatory: Boolean,
props: Option[BasicProperties])

object AmqpSinkStage {

/**
* Internal API
*/
private val defaultAttributes = Attributes.name("AmsqpSink")
.and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
private val defaultAttributes =
Attributes.name("AmsqpSink").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
}

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
* Each materialized sink will create one connection to the broker.
*/
final class AmqpSinkStage(settings: AmqpSinkSettings) extends GraphStage[SinkShape[OutgoingMessage]] with AmqpConnector { stage =>
final class AmqpSinkStage(settings: AmqpSinkSettings)
extends GraphStage[SinkShape[OutgoingMessage]]
with AmqpConnector { stage =>
import AmqpSinkStage._

val in = Inlet[OutgoingMessage]("AmqpSink.in")
Expand All @@ -33,41 +38,42 @@ final class AmqpSinkStage(settings: AmqpSinkSettings) extends GraphStage[SinkSha

override protected def initialAttributes: Attributes = defaultAttributes

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with AmqpConnectorLogic {
override val settings = stage.settings
private val exchange = settings.exchange.getOrElse("")
private val routingKey = settings.routingKey.getOrElse("")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with AmqpConnectorLogic {
override val settings = stage.settings
private val exchange = settings.exchange.getOrElse("")
private val routingKey = settings.routingKey.getOrElse("")

override def connectionFactoryFrom(settings: AmqpConnectionSettings) = stage.connectionFactoryFrom(settings)
override def connectionFactoryFrom(settings: AmqpConnectionSettings) = stage.connectionFactoryFrom(settings)

override def whenConnected(): Unit = {
val shutdownCallback = getAsyncCallback[ShutdownSignalException] { ex =>
failStage(ex)
}
channel.addShutdownListener(new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException): Unit = {
shutdownCallback.invoke(cause)
override def whenConnected(): Unit = {
val shutdownCallback = getAsyncCallback[ShutdownSignalException] { ex =>
failStage(ex)
}
})
pull(in)
}

setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
channel.basicPublish(
exchange,
routingKey,
elem.mandatory,
elem.immediate,
elem.props.orNull,
elem.bytes.toArray
)
channel.addShutdownListener(new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException): Unit = {
shutdownCallback.invoke(cause)
}
})
pull(in)
}
})

}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
channel.basicPublish(
exchange,
routingKey,
elem.mandatory,
elem.immediate,
elem.props.orNull,
elem.bytes.toArray
)
pull(in)
}
})

}

override def toString: String = "AmqpSink"
}
156 changes: 81 additions & 75 deletions amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSourceStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,108 +26,114 @@ object AmqpSourceStage {
*
* @param bufferSize The max number of elements to prefetch and buffer at any given time.
*/
final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSize: Int) extends GraphStage[SourceShape[IncomingMessage]] with AmqpConnector { stage =>
final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSize: Int)
extends GraphStage[SourceShape[IncomingMessage]]
with AmqpConnector { stage =>

val out = Outlet[IncomingMessage]("AmqpSource.out")

override val shape: SourceShape[IncomingMessage] = SourceShape.of(out)

override protected def initialAttributes: Attributes = AmqpSourceStage.defaultAttributes

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with AmqpConnectorLogic {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with AmqpConnectorLogic {

override val settings = stage.settings
override def connectionFactoryFrom(settings: AmqpConnectionSettings) = stage.connectionFactoryFrom(settings)
override val settings = stage.settings
override def connectionFactoryFrom(settings: AmqpConnectionSettings) = stage.connectionFactoryFrom(settings)

private val queue = mutable.Queue[IncomingMessage]()
private val queue = mutable.Queue[IncomingMessage]()

override def whenConnected(): Unit = {
import scala.collection.JavaConverters._
// we have only one consumer per connection so global is ok
channel.basicQos(bufferSize, true)
val consumerCallback = getAsyncCallback(handleDelivery)
val shutdownCallback = getAsyncCallback[Option[ShutdownSignalException]] {
case Some(ex) => failStage(ex)
case None => completeStage()
}

val amqpSourceConsumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]): Unit = {
consumerCallback.invoke(IncomingMessage(ByteString(body), envelope, properties))
override def whenConnected(): Unit = {
import scala.collection.JavaConverters._
// we have only one consumer per connection so global is ok
channel.basicQos(bufferSize, true)
val consumerCallback = getAsyncCallback(handleDelivery)
val shutdownCallback = getAsyncCallback[Option[ShutdownSignalException]] {
case Some(ex) => failStage(ex)
case None => completeStage()
}

override def handleCancel(consumerTag: String): Unit = {
// non consumer initiated cancel, for example happens when the queue has been deleted.
shutdownCallback.invoke(None)
val amqpSourceConsumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: BasicProperties,
body: Array[Byte]): Unit = {
consumerCallback.invoke(IncomingMessage(ByteString(body), envelope, properties))
}

override def handleCancel(consumerTag: String): Unit = {
// non consumer initiated cancel, for example happens when the queue has been deleted.
shutdownCallback.invoke(None)
}

override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = {
// "Called when either the channel or the underlying connection has been shut down."
shutdownCallback.invoke(Option(sig))
}
}

override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = {
// "Called when either the channel or the underlying connection has been shut down."
shutdownCallback.invoke(Option(sig))
def setupNamedQueue(settings: NamedQueueSourceSettings): Unit = {
channel.basicConsume(
settings.queue,
false, // never auto-ack
settings.consumerTag, // consumer tag
settings.noLocal,
settings.exclusive,
settings.arguments.asJava,
amqpSourceConsumer
)
}
}

def setupNamedQueue(settings: NamedQueueSourceSettings): Unit = {
channel.basicConsume(
settings.queue,
false, // never auto-ack
settings.consumerTag, // consumer tag
settings.noLocal,
settings.exclusive,
settings.arguments.asJava,
amqpSourceConsumer
)
}
def setupTemporaryQueue(settings: TemporaryQueueSourceSettings): Unit = {
// this is a weird case that required dynamic declaration, the queue name is not known
// up front, it is only useful for sources, so that's why it's not placed in the AmqpConnectorLogic
val queueName = channel.queueDeclare().getQueue
channel.queueBind(queueName, settings.exchange, settings.routingKey.getOrElse(""))
channel.basicConsume(
queueName,
amqpSourceConsumer
)
}

def setupTemporaryQueue(settings: TemporaryQueueSourceSettings): Unit = {
// this is a weird case that required dynamic declaration, the queue name is not known
// up front, it is only useful for sources, so that's why it's not placed in the AmqpConnectorLogic
val queueName = channel.queueDeclare().getQueue
channel.queueBind(queueName, settings.exchange, settings.routingKey.getOrElse(""))
channel.basicConsume(
queueName,
amqpSourceConsumer
)
}
settings match {
case settings: NamedQueueSourceSettings => setupNamedQueue(settings)
case settings: TemporaryQueueSourceSettings => setupTemporaryQueue(settings)
}

settings match {
case settings: NamedQueueSourceSettings => setupNamedQueue(settings)
case settings: TemporaryQueueSourceSettings => setupTemporaryQueue(settings)
}

}

def handleDelivery(message: IncomingMessage): Unit = {
if (isAvailable(out)) {
pushAndAckMessage(message)
} else {
if (queue.size + 1 > bufferSize) {
failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
def handleDelivery(message: IncomingMessage): Unit = {
if (isAvailable(out)) {
pushAndAckMessage(message)
} else {
queue.enqueue(message)
if (queue.size + 1 > bufferSize) {
failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
queue.enqueue(message)
}
}
}
}

setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (queue.nonEmpty) {
pushAndAckMessage(queue.dequeue())
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (queue.nonEmpty) {
pushAndAckMessage(queue.dequeue())
}
}
}

})
})

def pushAndAckMessage(message: IncomingMessage): Unit = {
push(out, message)
// ack it as soon as we have passed it downstream
// TODO ack less often and do batch acks with multiple = true would probably be more performant
channel.basicAck(
message.envelope.getDeliveryTag,
false // just this single message
)
}
def pushAndAckMessage(message: IncomingMessage): Unit = {
push(out, message)
// ack it as soon as we have passed it downstream
// TODO ack less often and do batch acks with multiple = true would probably be more performant
channel.basicAck(
message.envelope.getDeliveryTag,
false // just this single message
)
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.stream.javadsl.Sink
import akka.util.ByteString

object AmqpSink {

/**
* Java API: Creates an [[AmqpSink]] that accepts [[OutgoingMessage]] elements.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
package akka.stream.alpakka.amqp.javadsl

import akka.NotUsed
import akka.stream.alpakka.amqp.{ AmqpSourceStage, AmqpSourceSettings, IncomingMessage }
import akka.stream.alpakka.amqp.{ AmqpSourceSettings, AmqpSourceStage, IncomingMessage }
import akka.stream.javadsl.Source

object AmqpSource {

/**
* Java API: Creates an [[AmqpSource]] with given settings and buffer size.
*/
Expand Down
Loading

0 comments on commit d5a976f

Please sign in to comment.