Skip to content

Commit

Permalink
Added AMQPConsumerException
Browse files Browse the repository at this point in the history
Removed some unnecessary methods
There is still a memory leak
  • Loading branch information
Duncan McIntyre committed Oct 31, 2013
1 parent 6e40375 commit ec35318
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 0 deletions.
75 changes: 75 additions & 0 deletions examples/consumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php
/**
* Example code to use the AMQP PECL multiple-consumer extension
**/
$aParams = array(
'host' => 'ubuntu-G',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' =>'guest'
);

$sExchangeBase = "ex-perf-test-";
$sQueueBase = "q-perf-test-";
$aConsumers = array();

$aArgv = $_SERVER['argv'];
array_shift($aArgv);

$iNumConsumers = (int)array_shift($aArgv);

if(!$iNumConsumers) {
echo "You must supply the number of consumers to run\n";
exit;
}

echo "Starting $iNumConsumers consumers\n";


for($i=1; $i<=$iNumConsumers; $i++) {

$oConnection = new \AMQPConnection($aParams);
$oConnection->connect();

$oChannel = new \AMQPChannel($oConnection);

$oExchange = new \AMQPExchange($oChannel);
$oExchange->setName($sExchangeBase.$i);
$oExchange->setType(AMQP_EX_TYPE_FANOUT);
$oExchange->declareExchange();

$oQueue = new \AMQPQueue($oChannel);
$oQueue->setName($sQueueBase.$i);
$oQueue->setFlags(0);
$oQueue->declareQueue();
$oQueue->bind($oExchange->getName(), null);

$fn = function(\AMQPEnvelope $oEnvelope, \AMQPQueue $oQueue) {

$oQueue->ack($oEnvelope->getDeliveryTag());

return $oEnvelope->getBody() != "STOP!!";

};

$oConsumer = new \AMQPConsumer($oQueue, $fn);
$oConsumer->basicConsume();

$aConsumers[] = $oConsumer;
}

$oDispatcher = new \AMQPConsumerDispatcher($aConsumers);

while($oDispatcher->hasConsumers()) {
$oConsumer = $oDispatcher->select(1);

if($oConsumer !== null) {
if(!$oConsumer->consumeOne()) {
$oDispatcher->removeConsumer($oConsumer);
}
}
}

echo "Finished\n";

76 changes: 76 additions & 0 deletions examples/consumer.php~
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php
/**
* Example code to use the AMQP PECL multiple-consumer extension
**/
$aParams = array(
'host' => 'ubuntu-G',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' =>'guest'
);

$sExchangeBase = "ex-perf-test-";
$sQueueBase = "q-perf-test-";
$aConsumers = array();

$aArgv = $_SERVER['argv'];
array_shift($aArgv);

$iNumConsumers = (int)array_shift($aArgv);

if(!$iNumConsumers) {
echo "You must supply the number of consumers to run\n";
exit;
}

echo "Starting $iNumConsumers consumers\n";


for($i=1; $i<=$iNumConsumers; $i++) {

$oConnection = new \AMQPConnection($aParams);
$oConnection->connect();

$oChannel = new \AMQPChannel($oConnection);

$oExchange = new \AMQPExchange($oChannel);
$oExchange->setName($sExchangeBase.$i);
$oExchange->setType(AMQP_EX_TYPE_FANOUT);
$oExchange->declareExchange();

$oQueue = new \AMQPQueue($oChannel);
$oQueue->setName($sQueueBase.$i);
$oQueue->setFlags(0);
$oQueue->declareQueue();
$oQueue->bind($oExchange->getName(), null);
$oQueue->basicConsume();


$fn = function(\AMQPEnvelope $oEnvelope, \AMQPQueue $oQueue) {

$oQueue->ack($oEnvelope->getDeliveryTag());

return $oEnvelope->getBody() != "STOP!!";

};

$oConsumer = new \AMQPConsumer($oQueue, $fn);

$aConsumers[] = $oConsumer;
}

$oDispatcher = new \AMQPConsumerDispatcher($aConsumers);

while($oDispatcher->hasConsumers()) {
$oConsumer = $oDispatcher->select(1);

if($oConsumer !== null) {
if(!$oConsumer->consumeOne()) {
$oDispatcher->removeConsumer($oConsumer);
}
}
}

echo "Finished\n";

124 changes: 124 additions & 0 deletions examples/producer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php
/**
* Forks multiple producers, each sending messages to a dedicated exchange bound to one queue
**/
$aParams = array(
'host' => 'ubuntu-G',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' =>'guest'
);

$iNumConnections = 4;

$aArgv = $_SERVER['argv'];
array_shift($aArgv);

function getArg($sSwitch) {

global $aArgv;

if(!count($aArgv)) {
error("Argument expected for switch $sSwitch");
}

return array_shift($aArgv);
}

function error($sMsg) {
echo $sMsg."\n";
exit;
}

function startAndRun($iChildId, $aParams, $iNumMessages) {

$sExchangeBase = "ex-perf-test-";
$sQueueBase = "q-perf-test-";

$oConnection = new \AMQPConnection($aParams);
$oConnection->connect();

$oChannel = new \AMQPChannel($oConnection);

$oExchange = new \AMQPExchange($oChannel);
$oExchange->setName($sExchangeBase.$iChildId);
$oExchange->setType(AMQP_EX_TYPE_FANOUT);
$oExchange->declareExchange();

$oQueue = new \AMQPQueue($oChannel);
$oQueue->setName($sQueueBase.$iChildId);
$oQueue->setFlags(0);
$oQueue->declareQueue();
$oQueue->bind($oExchange->getName(), null);

for($i=0; $i<$iNumMessages; $i++) {
$oExchange->publish("Hello world", null);
}

$oExchange->publish("STOP!!");

exit;
}



$oImpl = null;
$iNumWorkers = 4;
$iNumMessages = 1000;

while(count($aArgv)) {

$sArg = array_shift($aArgv);

switch($sArg) {

case '--n':
case '-n': // set the number of workers
$iNumWorkers = (int)getArg($sArg);
break;

case '--m':
case '-m': // set the number of messages
$iNumMessages = (int)getArg($sArg);
break;

default:
error("Unrecognised switch $sArg");
}
}

if($iNumWorkers == 0) {
error("Cannot run zero workers");
}

if($iNumMessages == 0) {
error("Cannot send zero messages");
}

echo "Running $iNumWorkers workers each sending $iNumMessages messages\n";

$aKids = array();

for($i = 0; $i < $iNumWorkers; $i++) {

$iPid = pcntl_fork();

if($iPid == -1) {
error("Failed to fork worker");
} else if($iPid > 0) { // I am the parent
$aKids[$iPid] = true;
} else {

startAndRun($i+1, $aParams, $iNumMessages);
}
}

while(count($aKids)) {

$iPid = pcntl_waitpid(-1, $iStatus, WNOHANG);
unset($aKids[$iPid]);
usleep(100);
}

echo "Parent exiting\n";
8 changes: 8 additions & 0 deletions stubs/AMQPConsumerException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php
/**
* stub class representing AMQPConsumerException from pecl-amqp
*/
class AMQPConsumerException extends AMQPException
{
}

0 comments on commit ec35318

Please sign in to comment.