Skip to content

Commit

Permalink
Merge branch 'zf7948' of https://github.com/necrogami/zf2 into hotfix…
Browse files Browse the repository at this point in the history
…/zf-7948
  • Loading branch information
weierophinney committed Dec 17, 2010
2 parents 104bfce + feb1998 commit e0c2c0c
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions library/Zend/Queue/Adapter/Activemq.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class Activemq extends AbstractAdapter
*/
private $_client = null;

/**
* @var array
*/
private $_subscribed = array();

/**
* Constructor
*
Expand Down Expand Up @@ -171,6 +176,33 @@ public function getQueues()
throw new Queue\Exception('getQueues() is not supported in this adapter');
}

/**
* Checks if the client is subscribed to the queue
*
* @param \Zend\Queue\Queue $queue
* @return boolean
*/
protected function isSubscribed(Queue\Queue $queue)
{
return isset($this->_subscribed[$queue->getName()]);
}

/**
* Subscribes the client to the queue.
*
* @param \Zend\Queue\Queue $queue
* @return void
*/
protected function subscribe(Queue\Queue $queue)
{
$frame = $this->_client->createFrame();
$frame->setCommand('SUBSCRIBE');
$frame->setHeader('destination', $queue->getName());
$frame->setHeader('ack','client');
$this->_client->send($frame);
$this->_subscribed[$queue->getName()] = TRUE;
}

/**
* Return the first element in the queue
*
Expand All @@ -195,11 +227,9 @@ public function receive($maxMessages=null, $timeout=null, Queue\Queue $queue=nul
$data = array();

// signal that we are reading
$frame = $this->_client->createFrame();
$frame->setCommand('SUBSCRIBE');
$frame->setHeader('destination', $queue->getName());
$frame->setHeader('ack','client');
$this->_client->send($frame);
if(!$this->isSubscribed($queue)) {
$this->subscribe($queue);
}

if ($maxMessages > 0) {
if ($this->_client->canRead()) {
Expand Down

0 comments on commit e0c2c0c

Please sign in to comment.