A bundle to use Swarrot inside your Symfony application.
The recommended way to install this bundle is through Composer. Just run:
composer require swarrot/swarrot-bundle
Register the bundle in the kernel of your application:
// app/AppKernel.php
public function registerBundles()
{
$bundles = array(
// ...
new Swarrot\SwarrotBundle\SwarrotBundle(),
);
return $bundles;
}
swarrot:
provider: pecl # pecl or amqp_lib (require php-amqplib/php-amqplib)
default_connection: rabbitmq
default_command: swarrot.command.base # Swarrot\SwarrotBundle\Command\SwarrotCommand
logger: logger # logger or channel logger like monolog.logger.[my_channel]
connections:
rabbitmq:
url: "amqp://%rabbitmq_login%:%rabbitmq_password%@%rabbitmq_host%:%rabbitmq_port%/%rabbitmq_vhost%"
consumers:
my_consumer:
processor: my_consumer.processor.service # Symfony service id implementing Swarrot\Processor\ProcessorInterface
middleware_stack: # order matters
- configurator: swarrot.processor.signal_handler
# extras:
# signal_handler_signals:
# - SIGTERM
# - SIGINT
# - SIGQUIT
# - configurator: swarrot.processor.insomniac
- configurator: swarrot.processor.max_messages
# extras:
# max_messages: 100
- configurator: swarrot.processor.max_execution_time
# extras:
# max_execution_time: 300
- configurator: swarrot.processor.memory_limit
# extras:
# memory_limit: null
- configurator: swarrot.processor.doctrine_connection
# extras:
# doctrine_ping: true
# doctrine_close_master: true
- configurator: swarrot.processor.doctrine_object_manager
- configurator: swarrot.processor.exception_catcher
- configurator: swarrot.processor.ack
# extras:
# requeue_on_error: false
- configurator: swarrot.processor.retry
# extras:
# retry_exchange: retry
# retry_attempts: 3
# retry_routing_key_pattern: 'retry_%%attempt%%'
# - configurator: swarrot.processor.new_relic
# extras:
# new_relic_app_name: ~
# new_relic_license: ~
# new_relic_transaction_name: ~
# - configurator: swarrot.processor.rpc_server
# extras:
# # Exchange on which rpc response will be published with `reply_to` as routing_key.
# # If not configured will publish on default exchange where routing_key is used to define receiving queue.
# rpc_exchange: ~
# - configurator: swarrot.processor.rpc_client
# extras:
# rpc_client_correlation_id: ~
extras:
poll_interval: 500000
messages_types:
my_publisher:
connection: rabbitmq # use the default connection by default
exchange: my_exchange
routing_key: my_routing_key
First step is to retrieve the Swarrot publisher service from your controller.
$messagePublisher = $this->get('swarrot.publisher');
After that, you need to prepare your message with the Message class.
use Swarrot\Broker\Message;
$message = new Message('"My first message with the awesome swarrot lib :)"');
Then you can publish a new message into a predefined configuration
(connection
, exchange
, routing_key
, etc.) from your message_types
.
$messagePublisher->publish('my_publisher', $message);
When publishing a message, you can override the message_types
configuration
by passing a third argument:
$messagePublisher->publish('my_publisher', $message, array(
'exchange' => 'my_new_echange',
'connection' => 'my_second_connection',
'routing_key' => 'my_new_routing_key'
));
Swarrot will automatically create one command per consumer defined in your configuration. These command need the queue name to consume as first argument. You can also use a named connection as second argument if you don't want to use the default one.
app/console swarrot:consume:my_consumer queue_name [connection_name]
Your consumer (my_consumer.processor.service
) must implements Swarrot\Processor\ProcessorInterface
use Swarrot\Processor\ProcessorInterface;
class MyProcessor implements ProcessorInterface
{
public function process(Message $message, array $options)
{
var_dump($message->getBody()); // "My first message with the awesome swarrot lib :)"
}
}
Your processor will also be decorated automatically by all processors listed in the
middleware_stack
section. The order matters.
All these processors are configurable.
You can add an extras
key on each configurator definition in your config.yml
.
Take a look at the configuration reference to see
available extras for existing Configurators.
You can also use options of the command line:
- --poll-interval [default: 500000]: Change the polling interval when no message found in broker
- --requeue-on-error (-r): Re-queue the message in the same queue if an error occurred.
- --no-catch (-C): Disable the ExceptionCatcher processor (available only if the processor is in the stack)
- --max-execution-time (-t) [default: 300]: Configure the MaxExecutionTime processor (available only if the processor is in the stack)
- --max-messages (-m) [default: 300]: Configure the MaxMessages processor (available only if the processor is in the stack)
- --no-retry (-R): Disable the Retry processor (available only if the processor is in the stack)
Default values will be overriden by your config.yml
and usage of options will
override default config values.
Run your command with -h
to have the full list of options.
If you want to implement your own provider (like Redis), you first have to
implement the Swarrot\SwarrotBundle\Broker\FactoryInterface
.
Then, you can register it along with the others services and tag it with
swarrot.provider_factory
.
services:
app.swarrot.custom_provider_factory:
class: AppBundle\Provider\CustomFactory
tags:
- {name: swarrot.provider_factory}
app.swarrot.redis_provider_factory:
class: AppBundle\Provider\RedisFactory
tags:
- {name: swarrot.provider_factory, alias: redis}
Now you can tell Swarrot to use it in the config.yml
file.
swarrot:
provider: app.swarrot.custom_provider_factory
or with the alias
swarrot:
provider: redis
If you want to use a custom processor, you need two things. The Processor
itself and a ProcessorConfigurator
.
For the Processor
, you can refer to the swarrot/swarrot
documentation.
For the ConfigurationProcessor
, you need to implement the
ProcessorConfiguratorInterface
and to register it as an abstract service,
like this:
services:
my_own_processor_configurator_service_id:
abstract: true
class: MyProject\MyOwnProcessorConfigurator
Once done, just add it to the middleware stack of your consumer:
middleware_stack:
- configurator: swarrot.processor.signal_handler
- configurator: my_own_processor_configurator_service_id
As usual, take care of the order of your middleware_stack
.
If you use Swarrot, you may not want to actually publish messages when in test
environment for example. You can use the BlackholePublisher
to achieve this.
Simply override the swarrot.publisher.class
parameter in the DIC with the
Swarrot\SwarrotBundle\Broker\BlackholePublisher
class, by updating
config_test.yml
for instance:
parameters:
swarrot.publisher.class: Swarrot\SwarrotBundle\Broker\BlackholePublisher
This bundle goal is to deal with message consuming, not to deal with your broker configuration. We don't want to mix the infrastructure logic with the consuming one.
If you're looking for a tool to configure your broker, take a look at odolbeau/rabbit-mq-admin-toolkit.
This bundle is released under the MIT License. See the bundled LICENSE file for details.