RxJava 1.x utilities for AWS (SQS, S3, ...)
Status: released to Maven Central
- Represent an SQS queue as an
Observable<SqsMessage>
- Full backpressure support
- Supports low latency delivery (using long-polling which blocks a thread)
- Supports higher latency delivery via scheduled polling (reduced thread blocking)
See also: rxjava2-aws
Add the rxjava-aws dependency to your pom.xml:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-aws</artifactId>
<version>VERSION_HERE</version>
</dependency>
The method below blocks a thread (using long polling). When demand exists it connects to the AWS REST API (using the Amazon Java SDK) and blocks up to 20s waiting for a message. IO-wise it's cheap but of course comes with the expense of blocking a thread. Note that as backpressure is supported while no requests for messages exist the REST API will not be called.
Func0<AmazonSQSClient> sqs = () -> ...;
Sqs.queueName("my-queue")
// specify factory for Amazon SQS Client
.sqsFactory(sqs)
// get messages as observable
.messages()
.// process the message
.doOnNext(m -> System.out.println(m.message()))
// delete the message (if processing succeeded)
.doOnNext(m -> m.deleteMessage())
// log any errors
.doOnError(e -> log.warn(e.getMessage(), e))
// run in the background
.subscribeOn(Schedulers.io())
// any errors then delay and resubscribe (on an io thread)
.retryWhen(RetryWhen.delay(30, TimeUnit.SECONDS).build(),
Schedulers.io())
// go!
.subscribe(subscriber);
Use .interval
for scheduled polling:
Sqs.queueName("my-queue")
// specify factory for Amazon SQS Client
.sqsFactory(sqs)
// every 60 seconds check for messages
.interval(60, TimeUnit.SECONDS, Schedulers.io())
// get messages as observable
.messages()
...
or for lower level control of scheduled polling use .waitTimes
:
Sqs.queueName("my-queue")
// specify factory for Amazon SQS Client
.sqsFactory(sqs)
// every 60 seconds check for messages and wait for up to 5 seconds
.waitTimes(
Observable.interval(60, TimeUnit.SECONDS, Scheduler.io()).map(x -> 5),
TimeUnit.SECONDS)
// get messages as observable
.messages()
...
SQS queues are restricted to String messages (legal xml characters only) with a maximum size of 256K (binary messages would be Base64 encoded). If you want to pass larger messages then one pattern is to store the message content in a resource in an S3 bucket and put the resource name on to the queue. To receive the message you read the identifier from the queue and retrieve the resource bytes from the S3 bucket. Once you've dealt with the whole message you delete the S3 resource then remove the message from the queue.
To read and delete messages from an AWS queue in this way (with full backpressure support):
Func0<AmazonSQSClient> sqs = () -> ...;
Func0<AmazonS3Client> s3 = () -> ...;
Sqs.queueName("my-queue")
// specify factory for Amazon SQS Client
.sqsFactory(sqs)
// specify S3 bucket name
.bucketName("my-bucket")
// specify factory for Amazon S3 Client
.s3Factory(s3)
// get messages as observable
.messages()
// process the message
.doOnNext(System.out::println)
// delete the message (if processing succeeded)
.doOnNext(m -> m.deleteMessage())
// log any errors
.doOnError(e -> log.warn(e.getMessage(), e))
// run in the background
.subscribeOn(Schedulers.io())
// any errors then delay and resubscribe
.retryWhen(RetryWhen.delay(30, TimeUnit.SECONDS).build(),
Schedulers.io())
// go!
.subscribe(subscriber);
To place a message on an AWS SQS queue for picking up via the routine above:
String s3Id =
Sqs.sendToQueueUsingS3(sqs, queueUrl, s3, bucketName, messageBytes, s3IdFactory);
deleteMessage()
will work quite happily even if the source has been terminated/unsubscribed. While the source has not been terminated you get slightly better performance because the source's sqs and s3 client objects can be used to perform the delete.
// get just one message
SqsMessage message =
Sqs.queueName("my-queue")
.sqsFactory(sqs)
.bucketName("my-bucket")
.s3Factory(s3)
.messages()
.subscribeOn(Schedulers.io())
.first().toBlocking().single();
// this will still work fine
message.deleteMessage();