Skip to content

Commit

Permalink
AMQP documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
2m committed Oct 24, 2016
1 parent 8d0fec9 commit d9715de
Show file tree
Hide file tree
Showing 13 changed files with 560 additions and 156 deletions.
4 changes: 3 additions & 1 deletion amqp/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ lazy val amqp = (project in file(".")).
name := "akka-stream-alpakka-amqp"

libraryDependencies ++= Seq(
"com.rabbitmq" % "amqp-client" % "3.6.1" // APLv2
"com.rabbitmq" % "amqp-client" % "3.6.1", // APLv2
"com.novocode" % "junit-interface" % "0.11" % "test", // BSD-style
"junit" % "junit" % "4.12" % "test" // Eclipse Public License 1.0
)

Defaults.itSettings
17 changes: 15 additions & 2 deletions amqp/src/main/scala/akka/stream/contrib/amqp/AmqpSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,41 @@ final case class OutgoingMessage(bytes: ByteString, immediate: Boolean, mandator

object AmqpSink {

/**
* Scala API: Create [[AmqpSink]] that accepts ByteString elements.
*/
def simple(settings: AmqpSinkSettings): Sink[ByteString, NotUsed] =
apply(settings).contramap[ByteString](bytes => OutgoingMessage(bytes, false, false, None))

/**
* Scala API:
* Scala API: Create [[AmqpSink]] that accepts [[OutgoingMessage]] elements.
*/
def apply(settings: AmqpSinkSettings): Sink[OutgoingMessage, NotUsed] =
Sink.fromGraph(new AmqpSink(settings))

/**
* Java API:
* Java API: Create [[AmqpSink]] that accepts [[OutgoingMessage]] elements.
*/
def create(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[OutgoingMessage, NotUsed] =
akka.stream.javadsl.Sink.fromGraph(new AmqpSink(settings))

/**
* Java API: Create [[AmqpSink]] that accepts ByteString elements.
*/
def createSimple(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[ByteString, NotUsed] =
simple(settings).asJava

/**
* Internal API
*/
private val defaultAttributes = Attributes.name("AmsqpSink")
.and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
}

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
* Each materialized sink will create one connection to the broker.
*/
final class AmqpSink(settings: AmqpSinkSettings) extends GraphStage[SinkShape[OutgoingMessage]] with AmqpConnector { stage =>
import AmqpSink._

Expand Down
8 changes: 4 additions & 4 deletions amqp/src/main/scala/akka/stream/contrib/amqp/AmqpSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ final case class IncomingMessage(bytes: ByteString, envelope: Envelope, properti

object AmqpSource {
/**
* Scala API:
* Scala API: Creates [[AmqpSource]] with given settings and buffer size.
*/
def apply(settings: AmqpSourceSettings, bufferSize: Int): Source[IncomingMessage, NotUsed] =
Source.fromGraph(new AmqpSource(settings, bufferSize))

/**
* Java API:
* Java API: Creates [[AmqpSource]] with given settings and buffer size.
*/
def create(settings: AmqpSourceSettings, bufferSize: Int): akka.stream.javadsl.Source[IncomingMessage, NotUsed] =
akka.stream.javadsl.Source.fromGraph(new AmqpSource(settings, bufferSize))
Expand All @@ -33,8 +33,8 @@ object AmqpSource {
}

/**
* Connects to an amqp server upon materialization and consumes messages from it emitting them
* into the stream. Each materialized stage will create one connection to the broker.
* Connects to an AMQP server upon materialization and consumes messages from it emitting them
* into the stream. Each materialized source will create one connection to the broker.
* As soon as an `IncomingMessage` is sent downstream, an ack for it is sent to the broker.
*
* @param bufferSize The max number of elements to prefetch and buffer at any given time.
Expand Down
164 changes: 152 additions & 12 deletions amqp/src/main/scala/akka/stream/contrib/amqp/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package akka.stream.contrib.amqp

import scala.collection.immutable.Seq
import scala.collection.JavaConverters._

/**
* Internal API
Expand All @@ -18,46 +19,129 @@ sealed trait AmqpSourceSettings extends AmqpConnectorSettings
final case class NamedQueueSourceSettings(
connectionSettings: AmqpConnectionSettings,
queue: String,
declarations: Seq[Declaration],
declarations: Seq[Declaration] = Seq.empty,
noLocal: Boolean = false,
exclusive: Boolean = false,
consumerTag: String = "default",
arguments: Map[String, AnyRef] = Map.empty
) extends AmqpSourceSettings
) extends AmqpSourceSettings {
@annotation.varargs
def withDeclarations(declarations: Declaration*) = copy(declarations = declarations.toList)

def withNoLocal(noLocal: Boolean) = copy(noLocal = noLocal)

def withExclusive(exclusive: Boolean) = copy(exclusive = exclusive)

def withConsumerTag(consumerTag: String) = copy(consumerTag = consumerTag)

def withArguments(argument: (String, AnyRef), arguments: (String, AnyRef)*) =
copy(arguments = (argument +: arguments).toMap)

@annotation.varargs
def withArguments(argument: akka.japi.Pair[String, AnyRef], arguments: akka.japi.Pair[String, AnyRef]*) =
copy(arguments = (argument +: arguments).map(_.toScala).toMap)
}

object NamedQueueSourceSettings {
/**
* Java API
*/
def create(connectionSettings: AmqpConnectionSettings, queue: String) =
NamedQueueSourceSettings(connectionSettings, queue)
}

final case class TemporaryQueueSourceSettings(
connectionSettings: AmqpConnectionSettings,
exchange: String,
declarations: Seq[Declaration],
declarations: Seq[Declaration] = Seq.empty,
routingKey: Option[String] = None
) extends AmqpSourceSettings
) extends AmqpSourceSettings {
def withRoutingKey(routingKey: String) = copy(routingKey = Some(routingKey))

@annotation.varargs
def withDeclarations(declarations: Declaration*) = copy(declarations = declarations.toList)
}

object TemporaryQueueSourceSettings {
/**
* Java API
*/
def create(connectionSettings: AmqpConnectionSettings, exchange: String) =
TemporaryQueueSourceSettings(connectionSettings, exchange)
}

final case class AmqpSinkSettings(
connectionSettings: AmqpConnectionSettings,
exchange: Option[String],
routingKey: Option[String],
declarations: Seq[Declaration]
exchange: Option[String] = None,
routingKey: Option[String] = None,
declarations: Seq[Declaration] = Seq.empty
) extends AmqpConnectorSettings {
def withExchange(exchange: String) = copy(exchange = Some(exchange))

def withRoutingKey(routingKey: String) = copy(routingKey = Some(routingKey))

@annotation.varargs
def withDeclarations(declarations: Declaration*) = copy(declarations = declarations.toList)
}

object AmqpSinkSettings {
/**
* Java API
*/
def create(connectionSettings: AmqpConnectionSettings) =
AmqpSinkSettings(connectionSettings)
}

/**
* Only for internal implementations
*/
sealed trait AmqpConnectionSettings

case object DefaultAmqpConnection extends AmqpConnectionSettings
case object DefaultAmqpConnection extends AmqpConnectionSettings {
/**
* Java API
*/
def getInstance() = this
}

final case class AmqpConnectionUri(uri: String) extends AmqpConnectionSettings

object AmqpConnectionUri {
/**
* Java API:
*/
def create(uri: String) = AmqpConnectionUri(uri)
}

final case class AmqpConnectionDetails(
host: String,
port: Int,
credentials: Option[AmqpCredentials] = None,
virtualHost: Option[String] = None
) extends AmqpConnectionSettings
) extends AmqpConnectionSettings {

}

object AmqpConnectionDetails {
/**
* Java API:
*/
def create(host: String, port: Int) =
AmqpConnectionDetails(host, port)
}

final case class AmqpCredentials(username: String, password: String) {
override def toString = s"Credentials($username, ********)"
}

object AmqpCredentials {
/**
* Java API
*/
def create(username: String, password: String) =
AmqpCredentials(username, password)
}

sealed trait Declaration

final case class QueueDeclaration(
Expand All @@ -66,14 +150,50 @@ final case class QueueDeclaration(
exclusive: Boolean = false,
autoDelete: Boolean = false,
arguments: Map[String, AnyRef] = Map.empty
) extends Declaration
) extends Declaration {
def withDurable(durable: Boolean) = copy(durable = durable)

def withExclusive(exclusive: Boolean) = copy(exclusive = exclusive)

def withAutoDelete(autoDelete: Boolean) = copy(autoDelete = autoDelete)

def withArguments(argument: (String, AnyRef), arguments: (String, AnyRef)*) =
copy(arguments = (argument +: arguments).toMap)

@annotation.varargs
def withArguments(argument: akka.japi.Pair[String, AnyRef], arguments: akka.japi.Pair[String, AnyRef]*) =
copy(arguments = (argument +: arguments).map(_.toScala).toMap)
}

object QueueDeclaration {
/**
* Java API
*/
def create(name: String) = QueueDeclaration(name)
}

final case class BindingDeclaration(
queue: String,
exchange: String,
routingKey: Option[String] = None,
arguments: Map[String, AnyRef] = Map.empty
) extends Declaration
) extends Declaration {
def withRoutingKey(routingKey: String) = copy(routingKey = Some(routingKey))

def withArguments(argument: (String, AnyRef), arguments: (String, AnyRef)*) =
copy(arguments = (argument +: arguments).toMap)

@annotation.varargs
def withArguments(argument: akka.japi.Pair[String, AnyRef], arguments: akka.japi.Pair[String, AnyRef]*) =
copy(arguments = (argument +: arguments).map(_.toScala).toMap)
}

object BindingDeclaration {
/**
* Java API
*/
def create(queue: String, exchange: String) = BindingDeclaration(queue, exchange)
}

final case class ExchangeDeclaration(
name: String,
Expand All @@ -82,4 +202,24 @@ final case class ExchangeDeclaration(
autoDelete: Boolean = false,
internal: Boolean = false,
arguments: Map[String, AnyRef] = Map.empty
) extends Declaration
) extends Declaration {
def withDurable(durable: Boolean) = copy(durable = durable)

def withAutoDelete(autoDelete: Boolean) = copy(autoDelete = autoDelete)

def withInternal(internal: Boolean) = copy(internal = internal)

def withArguments(argument: (String, AnyRef), arguments: (String, AnyRef)*) =
copy(arguments = (argument +: arguments).toMap)

@annotation.varargs
def withArguments(argument: akka.japi.Pair[String, AnyRef], arguments: akka.japi.Pair[String, AnyRef]*) =
copy(arguments = (argument +: arguments).map(_.toScala).toMap)
}

object ExchangeDeclaration {
/**
* Java API
*/
def create(name: String, exchangeType: String) = ExchangeDeclaration(name, exchangeType)
}
Loading

0 comments on commit d9715de

Please sign in to comment.