Skip to content

Commit

Permalink
Add attribute parameters to sqs source settings akka#302
Browse files Browse the repository at this point in the history
  • Loading branch information
lnr0626 authored and johanandren committed May 17, 2017
1 parent 2d0ec27 commit 62faa51
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docs/src/main/paradox/sqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ SQS queues never finishes because there is no direct way to determine the end of
#### Source configuration

Scala
: @@snip (../../../../sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala) { #SqsSourceSettings }
: @@snip (../../../../sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceSettings.scala) { #SqsSourceSettings }

Options:

Expand Down
76 changes: 76 additions & 0 deletions sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceSettings.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package akka.stream.alpakka.sqs

import java.util

import scala.collection.JavaConverters._

object SqsSourceSettings {
val Defaults = SqsSourceSettings(20, 100, 10)

def create(waitTimeSeconds: Int, maxBufferSize: Int, maxBatchSize: Int): SqsSourceSettings =
SqsSourceSettings(waitTimeSeconds, maxBufferSize, maxBatchSize)

def create(waitTimeSeconds: Int,
maxBufferSize: Int,
maxBatchSize: Int,
attributeNames: util.List[AttributeName],
messageAttributeNames: util.List[MessageAttributeName]): SqsSourceSettings =
SqsSourceSettings(waitTimeSeconds,
maxBufferSize,
maxBatchSize,
attributeNames.asScala,
messageAttributeNames.asScala)

}

//#SqsSourceSettings
final case class SqsSourceSettings(
waitTimeSeconds: Int,
maxBufferSize: Int,
maxBatchSize: Int,
attributeNames: Seq[AttributeName] = Seq(),
messageAttributeNames: Seq[MessageAttributeName] = Seq()
) {
require(maxBatchSize <= maxBufferSize, "maxBatchSize must be lower or equal than maxBufferSize")
// SQS requirements
require(0 <= waitTimeSeconds && waitTimeSeconds <= 20,
s"Invalid value ($waitTimeSeconds) for waitTimeSeconds. Requirement: 0 <= waitTimeSeconds <= 20 ")
require(1 <= maxBatchSize && maxBatchSize <= 10,
s"Invalid value ($maxBatchSize) for maxBatchSize. Requirement: 1 <= maxBatchSize <= 10 ")
}
//#SqsSourceSettings

final case class MessageAttributeName(name: String) {
require(
name.matches("[0-9a-zA-Z_\\-.]+"),
"MessageAttributeNames may only contain alphanumeric characters and the underscore (_), hyphen (-), and period (.)"
)

require(
!name.matches("(^\\.[^*].*)|(.*\\.\\..*)|(.*\\.$)"),
"MessageAttributeNames cannot start or end with a period (.) or have multiple periods in succession (..)"
)

require(name.length <= 256, "MessageAttributeNames may not be longer than 256 characters")
}

sealed abstract class AttributeName(val name: String)
case object All extends AttributeName("All")
case object Policy extends AttributeName("Policy")
case object VisibilityTimeout extends AttributeName("VisibilityTimeout")
case object MaximumMessageSize extends AttributeName("MaximumMessageSize")
case object MessageRetentionPeriod extends AttributeName("MessageRetentionPeriod")
case object ApproximateNumberOfMessages extends AttributeName("ApproximateNumberOfMessages")
case object ApproximateNumberOfMessagesNotVisible extends AttributeName("ApproximateNumberOfMessagesNotVisible")
case object CreatedTimestamp extends AttributeName("CreatedTimestamp")
case object LastModifiedTimestamp extends AttributeName("LastModifiedTimestamp")
case object QueueArn extends AttributeName("QueueArn")
case object ApproximateNumberOfMessagesDelayed extends AttributeName("ApproximateNumberOfMessagesDelayed")
case object DelaySeconds extends AttributeName("DelaySeconds")
case object ReceiveMessageWaitTimeSeconds extends AttributeName("ReceiveMessageWaitTimeSeconds")
case object RedrivePolicy extends AttributeName("RedrivePolicy")
case object FifoQueue extends AttributeName("FifoQueue")
case object ContentBasedDeduplication extends AttributeName("ContentBasedDeduplication")
case object KmsMasterKeyId extends AttributeName("KmsMasterKeyId")
case object KmsDataKeyReusePeriodSeconds extends AttributeName("KmsDataKeyReusePeriodSeconds")

25 changes: 2 additions & 23 deletions sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,6 @@ import com.amazonaws.services.sqs.model.{Message, ReceiveMessageRequest, Receive

import scala.collection.JavaConverters._

object SqsSourceSettings {
val Defaults = SqsSourceSettings(20, 100, 10)

def create(waitTimeSeconds: Int, maxBufferSize: Int, maxBatchSize: Int): SqsSourceSettings =
SqsSourceSettings(waitTimeSeconds, maxBufferSize, maxBatchSize)

}

//#SqsSourceSettings
final case class SqsSourceSettings(
waitTimeSeconds: Int,
maxBufferSize: Int,
maxBatchSize: Int
) {
require(maxBatchSize <= maxBufferSize, "maxBatchSize must be lower or equal than maxBufferSize")
// SQS requirements
require(0 <= waitTimeSeconds && waitTimeSeconds <= 20,
s"Invalid value ($waitTimeSeconds) for waitTimeSeconds. Requirement: 0 <= waitTimeSeconds <= 20 ")
require(1 <= maxBatchSize && maxBatchSize <= 10,
s"Invalid value ($maxBatchSize) for maxBatchSize. Requirement: 1 <= maxBatchSize <= 10 ")
}
//#SqsSourceSettings

final class SqsSourceStage(queueUrl: String, settings: SqsSourceSettings)(implicit sqsClient: AmazonSQSAsync)
extends GraphStage[SourceShape[Message]] {

Expand Down Expand Up @@ -65,6 +42,8 @@ final class SqsSourceStage(queueUrl: String, settings: SqsSourceSettings)(implic
currentRequests = currentRequests + 1

val request = new ReceiveMessageRequest(queueUrl)
.withAttributeNames(settings.attributeNames.map(_.name).asJava)
.withMessageAttributeNames(settings.messageAttributeNames.map(_.name).asJava)
.withMaxNumberOfMessages(settings.maxBatchSize)
.withWaitTimeSeconds(settings.waitTimeSeconds)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.sqs.scaladsl

import akka.stream.alpakka.sqs.{MessageAttributeName, SqsSourceSettings}
import org.scalatest.{FlatSpec, Matchers}

class MessageAttributeNameSpec extends FlatSpec with Matchers {

it should "not allow names which have periods at the beginning" in {
a[IllegalArgumentException] should be thrownBy {
MessageAttributeName(".failed")
}
}

it should "not allow names which have periods at the end" in {
a[IllegalArgumentException] should be thrownBy {
MessageAttributeName("failed.")
}

}

it should "reject names which are longer than 256 characters" in {
a[IllegalArgumentException] should be thrownBy {
MessageAttributeName(
"A.really.realy.long.attribute.name.that.is.longer.than.what.is.allowed.256.characters.are.allowed." +
"however.they.cannot.contain.anything.other.than.alphanumerics.hypens.underscores.and.periods.though" +
"you.cant.have.more.than.one.consecutive.period.they.are.also.case.sensitive"
)
}
}
it should "reject names with multiple sequential periods" in {
a[IllegalArgumentException] should be thrownBy {
MessageAttributeName("multiple..periods")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
*/
package akka.stream.alpakka.sqs.scaladsl

import akka.stream.alpakka.sqs.SqsSourceSettings
import akka.stream.alpakka.sqs.{All, SqsSourceSettings}
import org.scalatest.{FlatSpec, Matchers}

class SqsSourceSettingsSpec extends FlatSpec with Matchers {

it should "accept valid parameters" in {
SqsSourceSettings(waitTimeSeconds = 1, maxBatchSize = 2, maxBufferSize = 3)
SqsSourceSettings(waitTimeSeconds = 1, maxBatchSize = 2, maxBufferSize = 3, attributeNames = Seq(All))
}

it should "require maxBatchSize <= maxBufferSize" in {
Expand Down

0 comments on commit 62faa51

Please sign in to comment.