Skip to content

SkipMike/subscribex

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

84 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Latest Release

The underlying AMQP client doesn't support Erlang 19 yet, so this repo depends on a forked version of the lib. Hex doesn't allow github deps, so 0.8.0 will be released when 0.2.0 of pma/amqp comes out. In the meantime:

def deps do
 [{:subscribex, git: "https://github.com/cjpoll/subscribex", ref: "<GIT-SHA>"}]
end

Subscribex

A lightweight wrapper around pma's Elixir AMQP library. Compared to the AMQP library, this package adds:

  1. Auto-start a single global connection to your RabbitMQ server on app startup
  2. Auto-reconnect to your RabbitMQ server (currently at 30 second intervals)
  3. A configurable subscriber abstraction
  4. Simplified channel creation, with the ability to automatically link or monitor the channel process.

NOTE: master is usually in "beta" status. Make sure you pull an actual release.

Contributor List

Special thanks to:

  • cavneb
  • justgage
  • vorce
  • telnicky
  • Cohedrin
  • ssnickolay

Your contributions are greatly appreciated!

Installation

  1. Add subscribex to your list of dependencies in mix.exs:
def deps do
 [{:subscribex, git: "https://github.com/cjpoll/subscribex", ref: "<GIT-SHA>"}]
end
  1. Ensure subscribex is started before your application, but probably not in test:
def application do
  apps = [:logger, ...]
  do_application(apps, Mix.env)
end

def do_application(apps, :test) do
  [mod: {MyApp, [:test]}, applications: apps]
end

do_application(apps, Mix.env)
  [mod: {MyApp, []}, applications: apps ++ [:subscribex]]
end

Usage

tl;dr

First, configure your amqp server information:

config :subscribex, rabbit_host: [
  username: "guest",
  password: "guest",
  host: "localhost",
  port: 5672
]

Then you can start writing subscribers:

defmodule MyApp.Subscribers.ActivityCreated do
  use Subscribex.Subscriber

  require Logger

  def start_link do
    Subscribex.Subscriber.start_link(__MODULE__)
  end

  def init do
    config =
      %Config{
        queue: "my_queue",
        exchange: "my_exchange",
        exchange_type: :topic,
        binding_opts: [routing_key: "my_key"]
      }

      {:ok, config}
  end

  def handle_payload(payload, _channel, _delivery_tag, _redelivered) do
    Logger.info(payload)
  end

  # handle_error/4 is an optional callback for handling when an exception is
  # raised in handle_payload/4
  def handle_error(error, payload, delivery_tag, channel) do
    Logger.error("Raised #{inspect error} handling #{inspect payload}")
  end
end

Configuration

First, configure your amqp server information in the appropriate config.exs file:

config :subscribex,
  rabbit_host: [
    username: "guest",
    password: "guest",
    host: "localhost",
    port: 5672]

Alternatively, you can use a well-formed RabbitMQ URI:

config :subscribex, rabbit_host: "amqp://guest:guest@localhost"

Simplest example

Once configured, you can start making subscribers. Subscribex.Subscriber is a behavior which requires two callbacks:

  @type redelivered :: boolean()

  @callback init() :: {:ok, %Subscribex.Subscriber.Config{}}
  @callback handle_payload(payload, channel, Subscribex.delivery_tag, redelivered)
  :: {:ok, :ack} | {:ok, :manual}

The Config struct is defined as:

  defmodule Config do
    defstruct [
      queue: nil,
      dead_letter_queue: nil,
      dead_letter_exchange: nil,
      exchange: nil,
      exchange_type: nil,
      dead_letter_exchange_type: nil,
      auto_ack: true,
      prefetch_count: 10,
      queue_opts: [],
      dead_letter_queue_opts: [],
      dead_letter_exchange_opts: [],
      exchange_opts: [],
      binding_opts: [],
      dl_binding_opts: []
    ]
  end

Assume a queue called "my_queue" is bound to a "my_exchange" exchange using the routing key "my_key".

An example of a subscriber that pops items off this queue and simply logs the payloads using Logger is as follows:

defmodule MyApp.Subscribers.ActivityCreated do
  use Subscribex.Subscriber

  require Logger

  def start_link do
    Subscribex.Subscriber.start_link(__MODULE__)
  end

  def init do
    config =
      %Config{
        queue: "my_queue",
        exchange: "my_exchange",
        exchange_type: :topic,
        binding_opts: [routing_key: "my_key"]
      }

      {:ok, config}
  end

  def handle_payload(payload, _channel, _delivery_tag, _redelivered) do
    Logger.info(payload)
  end
end

This module's start_link/0 function delegates to Subscribex.Subscriber.start_link/1.

Publishing and Deserializing

Instead of just logging the payload, let's deserialize it from a JSON string, transform the result, and publish it to a different queue.

Let's say the payload is of the format:

{
  "email": "[email protected]",
  "username": "abc123"
}

Let's say our goal is to send the user a welcome email and publish to the "my_app.new_user.emailed" routing key a result of the format:

{
  "email": "[email protected]",
  "username": "abc123",
  "welcome_email_delivered": true
}
defmodule MyApp.Subscribers.ActivityCreated do
  use Subscribex.Subscriber

  preprocess &__MODULE__.deserialize/1

  require Logger

  def start_link do
    Subscribex.Subscriber.start_link(__MODULE__)
  end

  @exchange "my_exchange"

  def init do
    config =
      %Config{
        queue: "my_queue",
        exchange: "my_exchange",
        exchange_type: :topic,
        binding_opts: [routing_key: "my_key"]
      }

      {:ok, config}
  end

  def deserialize(payload) do
    Poison.decode!(payload)
  end

  def handle_payload(%{"email" => email, "username" => username} = payload, channel, _delivery_tag, _redelivered) do
    :ok = MyApp.Email.send_welcome_email(email, username)

    {:ok, publishing_payload} =
      payload
      |> Map.put_new("welcome_email_delivered", true)
      |> Poison.encode

    routing_key = "my_app.new_user.emailed"

    Subscribex.publish(channel, @exchange, routing_key, publishing_payload)
  end
end

Using the preprocess/1 macro, we can setup plug-like pipelines of functions to execute before arriving at handle_payload/4.

Manual Acking

Let's take the email example and modify it a bit. It still has the same requirements, but the email sending is handled by another process, and we don't want to block the subscriber while it's working.

defmodule MyApp.Subscribers.UserRegistered do
  use Subscribex.Subscriber

  preprocess &__MODULE__.deserialize/1

  require Logger

  def start_link do
    Subscribex.Subscriber.start_link(__MODULE__)
  end

  @exchange "my_exchange"

  def init do
    config =
      %Config{
        queue: "my_queue",
        exchange: "my_exchange",
        exchange_type: :topic,
        binding_opts: [routing_key: "my_key"],
        auto_ack: false # Specify that we want to manually ack these jobs
      }

      {:ok, config}
  end

  def deserialize(payload) do
    Poison.decode!(payload)
  end

  def handle_payload(%{"email" => email, "username" => username}, channel, delivery_tag, _redelivered) do
    # hands off the job to another process, which will be responsible form
    # acking. It must ack the job on the same channel used to receive it.
    :ok = MyApp.Email.send_welcome_email(email, username, channel, delivery_tag) 
  end
end

Because we've told the subscriber we intend to manually ack, we need the channel and delivery_tag for the payload, which is provided by RabbitMQ. Now the process in charge of sending the email is responsible for acking the message to RabbitMQ when it's done processing the message.

In this example, we need a channel to publish the message; we can use the Subscribex.channel/1 function, which is spec-ed as:

  @spec channel(:link | :no_link | :monitor | function | (channel -> term))
  :: %AMQP.Channel{} | {%AMQP.Channel{}, monitor} | :ok

It expects you to pass in :link, :no_link, or :monitor. There is no default value - you must explicitly pass in the appropriate monitoring scheme for your use case.

We can then use this channel to publish on. When we use Subscribex.channel/1, we must remember to Subscribex.close/1 the channel when we're done with it - otherwise, we'll experience a memory leak as channel processes are created and never stopped.

If the channel is needed long-term, it's best to either link or monitor it, and handle the case where a connection failure occurs.

In this case, we only need the process for a short duration. This is why Subscribex.channel/1 also accepts a function which takes the channel as an argument. By doing this, we can do either of the following examples:

defmodule MyApp.Email do
  # Code for delivering the email here

  def handle_email_sent(channel, delivery_tag) do
    publish_channel = Subscribex.channel(:link)
    Subscribex.publish(publish_channel, ...other args)
    Subscribex.close(publish_channel)

    Subscribex.ack(channel, delivery_tag)
  end
end
defmodule MyApp.Email do
  # Code for delivering the email here

  def handle_email_sent(channel, delivery_tag) do
    Subscribex.channel(fn(publish_channel) ->
      Subscribex.publish(publish_channel, ...other args)
    end)

    Subscribex.ack(channel, delivery_tag)
  end
end

Now the process of channel creation and closing are managed for us, and we only have to code what we need to do with the channel.

Note that acking a message on a different channel than what it was received on is not allowed by RabbitMQ. So in the above example we should not ack on the publish_channel. Similarly we should not publish on the channel we receive messages on as it is not recommended.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Elixir 100.0%