Skip to content

Commit

Permalink
support of additional properties
Browse files Browse the repository at this point in the history
  • Loading branch information
AidasK committed Apr 11, 2016
1 parent b7335bd commit 3e82bbd
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 19 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,32 @@ Amqp::consume('queue-name', function ($message, $resolver) {
]);
```

## Fanout example

### Publishing a message

```php
\Amqp::publish('', 'message' , [
'exchange_type' => 'fanout',
'exchange' => 'amq.fanout',
]);
```

### Consuming messages

```php
\Amqp::consume('', function ($message, $resolver) {
var_dump($message->body);
$resolver->acknowledge($message);
}, [
'exchange' => 'amq.fanout',
'exchange_type' => 'fanout',
'queue_force_declare' => true,
'queue_exclusive' => true,
'persistent' => true// required if you want to listen forever
]);
```

## Credits

* Some concepts were used from https://github.com/mookofe/tail
Expand Down
9 changes: 5 additions & 4 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use Illuminate\Config\Repository;
use Closure;
use PhpAmqpLib\Exception\AMQPTimeoutException;

/**
* @author Björn Schmitt <[email protected]>
Expand Down Expand Up @@ -45,10 +46,10 @@ public function consume($queue, Closure $closure)
$this->getChannel()->basic_consume(
$queue,
$this->getProperty('consumer_tag'),
false,
false,
false,
false,
$this->getProperty('consumer_no_local'),
$this->getProperty('consumer_no_ack'),
$this->getProperty('consumer_exclusive'),
$this->getProperty('consumer_nowait'),
function ($message) use ($closure, $object) {
$closure($message, $object);
}
Expand Down
38 changes: 36 additions & 2 deletions src/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,41 @@ abstract class Context
/**
* @var array
*/
protected $properties = [];
protected $properties = [
'host' => 'localhost',
'port' => 5672,
'username' => 'guest',
'password' => 'guest',
'vhost' => '/',
'persistent' => false,

'exchange' => 'amq.topic',
'exchange_type' => 'topic',
'exchange_passive' => false,
'exchange_durable' => true,
'exchange_auto_delete' => false,
'exchange_internal' => false,
'exchange_nowait' => false,
'exchange_properties' => [],

'consumer_tag' => '',
'consumer_no_local' => false,
'consumer_no_ack' => false,
'consumer_exclusive' => false,
'consumer_nowait' => false,

'queue_force_declare' => false,
'queue_passive' => false,
'queue_durable' => true,
'queue_exclusive' => false,
'queue_auto_delete' => false,
'queue_nowait' => false,

'queue_properties' => [],
'connect_options' => [],
'ssl_options' => [],
'timeout' => 0,
];

/**
* Context constructor.
Expand All @@ -33,7 +67,7 @@ protected function extractProperties(Repository $config)
{
if ($config->has(self::REPOSITORY_KEY)) {
$data = $config->get(self::REPOSITORY_KEY);
$this->properties = $data['properties'][$data['use']];
$this->mergeProperties($data['properties'][$data['use']]);
}
}

Expand Down
27 changes: 14 additions & 13 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,40 @@ public function setup()
$this->channel->exchange_declare(
$exchange,
$this->getProperty('exchange_type'),
false,
true,
false,
false,
false,
$this->getProperty('exchange_passive'),
$this->getProperty('exchange_durable'),
$this->getProperty('exchange_auto_delete'),
$this->getProperty('exchange_internal'),
$this->getProperty('exchange_nowait'),
$this->getProperty('exchange_properties')
);

$queue = $this->getProperty('queue');

if (!empty($queue)) {
if (!empty($queue) || $this->getProperty('queue_force_declare')) {

/*
name: $queue
passive: false
durable: true // the queue will survive server restarts
exclusive: false // the queue can be accessed in other channels
exclusive: false // queue is deleted when connection closes
auto_delete: false //the queue won't be deleted once the channel is closed.
nowait: false // Doesn't wait on replies for certain things.
parameters: array // Extra data, like high availability params
*/

/** @var ['queue name', 'message count',] queueInfo */
$this->queueInfo = $this->channel->queue_declare(
$queue,
false,
true,
false,
false,
false,
$this->getProperty('queue_passive'),
$this->getProperty('queue_durable'),
$this->getProperty('queue_exclusive'),
$this->getProperty('queue_auto_delete'),
$this->getProperty('queue_nowait'),
$this->getProperty('queue_properties')
);

$this->channel->queue_bind($queue, $exchange, $this->getProperty('routing'));
$this->channel->queue_bind($queue ?: $this->queueInfo[0], $exchange, $this->getProperty('routing'));

}

Expand Down

0 comments on commit 3e82bbd

Please sign in to comment.