anik/amqp
is a php-amqplib wrapper that eases the consumption of RabbitMQ. A painless way of using RabbitMQ.
Previously, the package could be used with Laravel, Laravel Zero, Lumen out of the box. From v2
, the Laravel support
has been removed. If you are looking for implementation with Laravel, you can
use anik/laravel-amqp.
Checkout the repository for example.
- PHP
^7.2 | ^8.0
- PHP-AMQPLib
^3.0
To install the package, run
composer require anik/amqp
For V1: https://medium.com/@sirajul.anik/rabbitmq-for-php-developers-c17cd019a90
To create an AMQP Connection, you can use
Anik\Amqp\AmqpConnectionFactory::make
Anik\Amqp\AmqpConnectionFactory::makeFromArray
<?php
use Anik\Amqp\AmqpConnectionFactory;
use PhpAmqpLib\Connection\AMQPLazySSLConnection;
$host = '127.0.0.1';
$port = 5672;
$user = 'user';
$password = 'password';
$vhost = '/';
$options = []; // options to be proxied to the amqp connection class
$ofClass = AMQPLazySSLConnection::class;
$connection = AmqpConnectionFactory::make($host, $port, $user, $password, $vhost, $options, $ofClass);
$hosts = [
[
'host' => $host,
'port' => $port,
'user' => $user,
'password' => $password,
'vhost' => $vhost,
],
[
'host' => $host,
'port' => $port,
'user' => $user,
'password' => $password,
'vhost' => $vhost,
]
];
// With AmqpConnectionFactory::makeFromArray method, you can try to connect to multiple host
$connection = AmqpConnectionFactory::makeFromArray($hosts, $options, $ofClass);
Also, there are four specific exchange classes.
Anik\Amqp\Exchanges\Direct
for direct exchange.Anik\Amqp\Exchanges\Fanout
for fanout exchange.Anik\Amqp\Exchanges\Headers
for headers exchange.Anik\Amqp\Exchanges\Topic
for topic exchange.
You can still use Anik\Amqp\Exchanges\Exchange
base class to create your own exchange.
To instantiate an exchange, you can do like
<?php
use Anik\Amqp\Exchanges\Exchange;
use Anik\Amqp\Exchanges\Fanout;
use Anik\Amqp\Exchanges\Topic;
$exchange = new Exchange('anik.amqp.direct.exchange', Exchange::TYPE_DIRECT);
$exchange = Exchange::make(['name' => 'anik.amqp.direct.exchange', 'type' => Exchange::TYPE_DIRECT]);
$exchange = new Topic('anik.amqp.topic.exchange');
$exchange = Fanout::make(['name' => 'anik.amqp.fanout.exchange']);
When creating an exchange instance with
Exchange::make
-name
andtype
keys must be present in the given array.Topic::make
Fanout::make
Headers::make
Direct::make
-name
key must be present in the given array.
Anik\Amqp\Exchanges\Exchange
contains a few predefined exchange types, you can use them as reference.
TYPE_DIRECT
for direct type.TYPE_TOPIC
for topic type.TYPE_FANOUT
for fanout type.TYPE_HEADERS
for headers type.
The Exchange::make
method also accepts the following keys when making an exchange instance.
declare
Type:bool
. Default:false
. If you want to declare the exchange.passive
Type:bool
. Default:false
. If the exchange is passive.durable
Type:bool
. Default:true
. If the exchange is durable.auto_delete
Type:bool
. Default:false
. If the exchange should auto delete.internal
Type:bool
. Default:false
. If the exchange is internal.no_wait
Type:bool
. Default:false
. If the client should not wait for the server's reply.arguments
Type:array
. Default:[]
.ticket
Type:null | integer
. Default:null
.
You can also reconfigure the exchange instance using $exchange->reconfigure($options)
. The $options
array accepts
the above keys as well.
Also, you can use the following methods to configure your exchange instance.
setName
- Accepts:string
. The only way to change exchange name after instantiation.setDeclare
- Accepts:bool
.setType
- Accepts:bool
.setPassive
- Accepts:bool
.setDurable
- Accepts:bool
.setAutoDelete
- Accepts:bool
.setInternal
- Accepts:bool
.setNowait
- Accepts:bool
.setArguments
- Accepts:array
.setTicket
- Accepts:null | integer
.
To instantiate a queue, you can do like
<?php
use Anik\Amqp\Queues\Queue;
$queue = new Queue('anik.amqp.direct.exchange.queue');
$queue = Queue::make(['name' => 'anik.amqp.direct.exchange.queue']);
When creating a queue instance with
Queue::make
-name
keys must be present in the given array.
The Queue::make
method also accepts the following keys when making a queue instance.
declare
Type:bool
. Default:false
. If you want to declare the queue.passive
Type:bool
. Default:false
. If the queue is passive.durable
Type:bool
. Default:true
. If the queue is durable.exclusive
Type:bool
. Default:false
. If the queue is exclusive.auto_delete
Type:bool
. Default:false
. If the queue should auto delete.no_wait
Type:bool
. Default:false
. If the client should not wait for the server's reply.arguments
Type:array
. Default:[]
.ticket
Type:null | integer
. Default:null
.
You can also reconfigure the queue instance using $queue->reconfigure($options)
. The $options
array accepts the
above keys as well.
Also, you can use the following methods to configure your queue instance.
setName
- Accepts:string
. The only way to change queue name after instantiation.setDeclare
- Accepts:bool
.setType
- Accepts:bool
.setPassive
- Accepts:bool
.setDurable
- Accepts:bool
.setExclusive
- Accepts:bool
.setAutoDelete
- Accepts:bool
.setNowait
- Accepts:bool
.setArguments
- Accepts:array
.setTicket
- Accepts:null | integer
.
To instantiate a Qos, you can do like
<?php
use Anik\Amqp\Qos\Qos;
$prefetchSize = 0;
$prefetchCount = 0;
$global = false;
$qos = new Qos($prefetchSize, $prefetchCount, $global);
$qos = Queue::make(['prefetch_size' => $prefetchSize, 'prefetch_count' => $prefetchCount, 'global' => $global]);
The Qos::make
method also accepts the following key when making a qos instance.
prefetch_size
Type:int
. Default:0
.prefetch_count
Type:int
. Default:0
.global
Type:bool
. Default:true
.
You can also reconfigure the qos instance using $qos->reconfigure($options)
. The $options
array accepts the above
keys as well.
Also, you can use the following methods to configure your qos instance.
setPrefetchCount
- Accepts:int
.setPrefetchSize
- Accepts:int
.setGlobal
- Accepts:bool
.
To produce/publish messages, you'll need the Anik\Amqp\Producer
instance. To instantiate the class
<?php
use Anik\Amqp\Producer;
$producer = new Producer($connection, $channel);
The constructor accepts
$connection
Type:PhpAmqpLib\Connection\AbstractConnection
. Required.$channel
Type:null | PhpAmqpLib\Channel\AMQPChannel
. Optional.
If $channel
is not provided or null, class uses the channel from the $connection
.
Once the producer class is instantiated, you can set a channel with setChannel
. Method
accepts PhpAmqpLib\Channel\AMQPChannel
instance.
There are three ways to publish messages
Producer::publishBatch
- to publish multiple messages in bulk.
<?php
use Anik\Amqp\Producer;
(new Producer($connection))->publishBatch($messages, $routingKey, $exchange, $options);
$messages
Type:Anik\Amqp\Producible[]
. If any of the message is not the type ofProducible
interface, it'll throw error.$routingKey
Type:string
. Routing key. Default''
(empty string).$exchange
Type:null | Anik\Amqp\Exchanges\Exchange
.$options
Type:array
. Runtime configuration.- Key
exchange
- Accepts:array
.- If you pass
null
as$exchange
, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass$exchange
with Exchange instance and$options['exchange']
, exchange instance will be reconfigured accordingly with the values available in$options['exchange']
. Keys are same asExchange::make
's$options
.
- If you pass
- Key
publish
- Accepts:array
.- Key
mandatory
Defaultfalse
. - Key
immediate
Defaultfalse
. - Key
ticket
Defaultnull
. - Key
batch_count
. Default:500
. To make a batch of X messages before publishing a batch.
- Key
- Key
Producer::publish
- to publish a single message. Uses Producer::publishBatch
under the hood.
<?php
use Anik\Amqp\Producer;
(new Producer($connection))->publish($message, $routingKey, $exchange, $options);
$message
Type:Anik\Amqp\Producible
.$routingKey
Type:string
. Routing key. Default''
(empty string).$exchange
Type:null | Anik\Amqp\Exchanges\Exchange
.$options
Type:array
. Runtime configuration.- Key
exchange
- Accepts:array
.- If you pass
null
as$exchange
, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass$exchange
with Exchange instance and$options['exchange']
, exchange instance will be reconfigured accordingly with the values available in$options['exchange']
. Keys are same asExchange::make
's$options
.
- If you pass
- Key
publish
- Accepts:array
.- Key
mandatory
Defaultfalse
. - Key
immediate
Defaultfalse
. - Key
ticket
Defaultnull
.
- Key
- Key
Producer::publishBasic
- to publish a single message using AMQPChannel::basic_publish
method.
<?php
use Anik\Amqp\Producer;
(new Producer($connection))->publishBasic($message, $routingKey, $exchange, $options);
$message
Type:Anik\Amqp\Producible
.$routingKey
Type:string
. Routing key. Default''
(empty string).$exchange
Type:null | Anik\Amqp\Exchanges\Exchange
.$options
Type:array
. Runtime configuration.- Key
exchange
- Accepts:array
.- If you pass
null
as$exchange
, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass$exchange
with Exchange instance and$options['exchange']
, exchange instance will be reconfigured accordingly with the values available in$options['exchange']
. Keys are same asExchange::make
's$options
.
- If you pass
- Key
publish
- Accepts:array
.- Key
mandatory
Defaultfalse
. - Key
immediate
Defaultfalse
. - Key
ticket
Defaultnull
.
- Key
- Key
The package comes with Anik\Amqp\ProducibleMessage
, a generic implementation of Anik\Amqp\Producible
interface.
You can instantiate the class like
<?php
use Anik\Amqp\ProducibleMessage;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$msg = new ProducibleMessage('take my message to rabbitmq');
$msg = new ProducibleMessage('take my message to rabbitmq', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
$msg = (new ProducibleMessage())->setMessage('take my message to rabbitmq')->setProperties([
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable(['key' => 'value']),
]);
To consume messages, you'll need the Anik\Amqp\Consumer
instance. To instantiate the class
<?php
use Anik\Amqp\Consumer;
$consumer = new Consumer($connection, $channel, $options);
The constructor accepts
$connection
Type:PhpAmqpLib\Connection\AbstractConnection
. Required.$channel
Type:null | PhpAmqpLib\Channel\AMQPChannel
. Optional.$options
Type:array
. Optional. Configurations for consumer.tag
Type:string
. Defaultsprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid())
. To set consumer tag.no_local
Type:bool
. Defaultfalse
.no_ack
Type:bool
. Defaultfalse
.exclusive
Type:bool
. Defaultfalse
.no_wait
Type:bool
. Defaultfalse
.arguments
Type:bool
. Default[]
.ticket
Type:null | int
. Defaultnull
.
If $channel
is not provided or null, class uses the channel from the $connection
.
Once the consumer class is instantiated, you can access the following methods.
setChannel
- Accepts:PhpAmqpLib\Channel\AMQPChannel
instance.reconfigure
- Accepts:array
. To reconfigure the instance. Valid keys are same as constructor's options keys.setConsumerTag
- Accepts:string
. Defaultsprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid())
.setNoLocal
- Accepts:bool
. Defaultfalse
.setNoAck
- Accepts:bool
. Defaultfalse
.setExclusive
- Accepts:bool
. Defaultfalse
.setNowait
- Accepts:bool
. Defaultfalse
.setArguments
- Accepts:array
. Default[]
.setTicket
- Accepts:null | int
. Defaultnull
.
To consume messages,
<?php
use Anik\Amqp\Consumer;
(new Consumer($connection, $channel, $options))->consume($handler, $bindingKey, $exchange, $queue, $qos, $options);
$handler
Type:Anik\Amqp\Consumable
.$bindingKey
Type:string
. Binding key. Default''
(empty string).$exchange
Type:null | Anik\Amqp\Exchanges\Exchange
.$queue
Type:null | Anik\Amqp\Queues\Queue
.$qos
Type:null | Anik\Amqp\Qos\Qos
.$options
Type:array
. Runtime configuration.consumer
- Accepts:array
. Keys are same asConsumer::__construct
's options.exchange
- Accepts:array
. Keys are same asExchange::make
's options.- If you pass
null
as$exchange
, then you must provide a valid configuration through this key to create an exchange under the hood. If you pass$exchange
with Exchange instance and$options['exchange']
, exchange instance will be reconfigured accordingly with the values available in$options['exchange']
.
- If you pass
queue
- Accepts:array
. Keys are same asQueue::make
's options.- If you pass
null
as$queue
, then you must provide a valid configuration through this key to create a queue under the hood. If you pass$queue
with Queue instance and$options['queue']
, queue instance will be reconfigured accordingly with the values available in$options['queue']
.
- If you pass
qos
- Accepts:array
. Keys are same asQos::make
's options.- If you pass
$qos
with Qos instance and$options['qos']
, qos instance will be reconfigured accordingly. If$qos
is null and$options['qos']
holds value, QoS will be applied to the channel. If$qos
isnull
and$options['qos']
is not available, NO QoS WILL BE APPLIED TO THE CHANNEL
- If you pass
bind
- Accepts:array
. For binding queue to the exchange.no_wait
. Defaultfalse
.arguments
. Default[]
.ticket
. Defaultnull
.
consume
- Accepts:array
. Following values are passed to theAMQPChannel::wait()
.allowed_methods
Defaultnull
.non_blocking
Defaultfalse
.timeout
Default0
.
The package comes with Anik\Amqp\ConsumableMessage
, a generic implementation of Anik\Amqp\Consumable
interface.
You can instantiate the class like
<?php
use Anik\Amqp\ConsumableMessage;
// use PhpAmqpLib\Message\AMQPMessage;
$msg = new ConsumableMessage(function (ConsumableMessage $message/*, AMQPMessage $original*/) {
echo $message->getMessageBody();
$message->ack();
// Alternatively, $original->ack();
});
NOTE: Calling any method on ConsumableMessage
instance without setting AMQPMessage will throw exception.
If you find any issue/bug/missing feature, please submit an issue and PRs if possible.