Skip to content

RxJava 1.x utilities for AWS (SQS, S3, ...)

License

Notifications You must be signed in to change notification settings

ashokjjr/rxjava-aws

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

rxjava-aws


Maven Central
codecov

RxJava 1.x utilities for AWS (SQS, S3, ...)

Status: released to Maven Central

  • Represent an SQS queue as an Observable<SqsMessage>
  • Full backpressure support
  • Supports low latency delivery (using long-polling which blocks a thread)
  • Supports higher latency delivery via scheduled polling (reduced thread blocking)

See also: rxjava2-aws

Getting started

Add the rxjava-aws dependency to your pom.xml:

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-aws</artifactId>
    <version>VERSION_HERE</version>
</dependency>

Reading messages from an AWS SQS queue

The method below blocks a thread (using long polling). When demand exists it connects to the AWS REST API (using the Amazon Java SDK) and blocks up to 20s waiting for a message. IO-wise it's cheap but of course comes with the expense of blocking a thread. Note that as backpressure is supported while no requests for messages exist the REST API will not be called.

Func0<AmazonSQSClient> sqs = () -> ...;

Sqs.queueName("my-queue")
    // specify factory for Amazon SQS Client
   .sqsFactory(sqs)
   // get messages as observable
   .messages()
   .// process the message
   .doOnNext(m -> System.out.println(m.message()))
   // delete the message (if processing succeeded)
   .doOnNext(m -> m.deleteMessage())
   // log any errors
   .doOnError(e -> log.warn(e.getMessage(), e))
   // run in the background
   .subscribeOn(Schedulers.io())
   // any errors then delay and resubscribe (on an io thread)
   .retryWhen(RetryWhen.delay(30, TimeUnit.SECONDS).build(), 
              Schedulers.io())
   // go!
   .subscribe(subscriber);

Use .interval for scheduled polling:

Sqs.queueName("my-queue")
    // specify factory for Amazon SQS Client
   .sqsFactory(sqs)
   // every 60 seconds check for messages
   .interval(60, TimeUnit.SECONDS, Schedulers.io())
   // get messages as observable
   .messages()
   ...

or for lower level control of scheduled polling use .waitTimes:

Sqs.queueName("my-queue")
    // specify factory for Amazon SQS Client
   .sqsFactory(sqs)
   // every 60 seconds check for messages and wait for up to 5 seconds
   .waitTimes(
       Observable.interval(60, TimeUnit.SECONDS, Scheduler.io()).map(x -> 5),
       TimeUnit.SECONDS)
   // get messages as observable
   .messages()
   ...

Reading messages from an AWS SQS queue via S3 storage

SQS queues are restricted to String messages (legal xml characters only) with a maximum size of 256K (binary messages would be Base64 encoded). If you want to pass larger messages then one pattern is to store the message content in a resource in an S3 bucket and put the resource name on to the queue. To receive the message you read the identifier from the queue and retrieve the resource bytes from the S3 bucket. Once you've dealt with the whole message you delete the S3 resource then remove the message from the queue.

To read and delete messages from an AWS queue in this way (with full backpressure support):

Func0<AmazonSQSClient> sqs = () -> ...;
Func0<AmazonS3Client> s3 = () -> ...; 

Sqs.queueName("my-queue")
    // specify factory for Amazon SQS Client
   .sqsFactory(sqs)
   // specify S3 bucket name
   .bucketName("my-bucket")
   // specify factory for Amazon S3 Client
   .s3Factory(s3)
   // get messages as observable
   .messages()
   // process the message
   .doOnNext(System.out::println)
   // delete the message (if processing succeeded)
   .doOnNext(m -> m.deleteMessage())
   // log any errors
   .doOnError(e -> log.warn(e.getMessage(), e))
   // run in the background
   .subscribeOn(Schedulers.io())
   // any errors then delay and resubscribe
   .retryWhen(RetryWhen.delay(30, TimeUnit.SECONDS).build(),
              Schedulers.io())
   // go!
   .subscribe(subscriber);

Sending messages to a an AWS SQS queue via S3 storage

To place a message on an AWS SQS queue for picking up via the routine above:

String s3Id = 
  Sqs.sendToQueueUsingS3(sqs, queueUrl, s3, bucketName, messageBytes, s3IdFactory);

Deleting messages from the queue

deleteMessage() will work quite happily even if the source has been terminated/unsubscribed. While the source has not been terminated you get slightly better performance because the source's sqs and s3 client objects can be used to perform the delete.

// get just one message
SqsMessage message = 
   Sqs.queueName("my-queue")
      .sqsFactory(sqs)
      .bucketName("my-bucket")
      .s3Factory(s3)
      .messages()
      .subscribeOn(Schedulers.io())
      .first().toBlocking().single();
      
// this will still work fine        
message.deleteMessage();

About

RxJava 1.x utilities for AWS (SQS, S3, ...)

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 100.0%