Skip to content

Commit

Permalink
Updated README. Includes changes from last push which had not been co…
Browse files Browse the repository at this point in the history
…mmitted properly
  • Loading branch information
Duncan McIntyre committed Oct 31, 2013
1 parent cc98853 commit 4a7d9f7
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 197 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
[![Build Status](https://secure.travis-ci.org/pdezwart/php-amqp.png)](http://travis-ci.org/pdezwart/php-amqp)

# PHP AMQP bindings

Object-oriented PHP bindings for the AMQP C library (https://github.com/alanxz/rabbitmq-c)
A fork of the official bindings here: https://github.com/alanxz/rabbitmq-c.

This fork adds two classes

AMQPConsumer and AMQPConsumerDispatcher

which allow, when using multiple connections, a script to consume
from multiple queues at the same time.

The code uses select() for efficiency.
32 changes: 10 additions & 22 deletions amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ zend_class_entry *amqp_exception_class_entry,
*amqp_connection_exception_class_entry,
*amqp_channel_exception_class_entry,
*amqp_queue_exception_class_entry,
*amqp_exchange_exception_class_entry;
*amqp_exchange_exception_class_entry,
*amqp_consumer_exception_class_entry;

int le_amqp_connection_resource;

Expand Down Expand Up @@ -245,21 +246,12 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_queue_class_get, ZEND_SEND_BY_VAL, ZEND_RETU
ZEND_ARG_INFO(0, flags)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_queue_class_basicConsume, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_ARG_INFO(0, flags)
ZEND_ARG_INFO(0, consumerKey)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_queue_class_consume, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_ARG_INFO(0, callback)
ZEND_ARG_INFO(0, flags)
ZEND_ARG_INFO(0, consumer_tag)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_queue_class_select, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_ARG_INFO(0, timeout)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_queue_class_ack, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_ARG_INFO(0, delivery_tag)
ZEND_ARG_INFO(0, flags)
Expand Down Expand Up @@ -429,12 +421,6 @@ ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_consumer_dispatcher_class_hasConsumers, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_consumer_dispatcher_class_getConsumers, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_consumer_dispatcher_class_rotateConsumers, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_consumer_dispatcher_class_removeConsumer, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_ARG_INFO(0, consumer)
ZEND_END_ARG_INFO()
Expand All @@ -443,8 +429,6 @@ ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_consumer_class__construct, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 2)
ZEND_ARG_INFO(0, amqp_queue)
ZEND_ARG_INFO(0, callback)
ZEND_ARG_INFO(0, flags)
ZEND_ARG_INFO(0, consumer_tag)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_consumer_class_getQueue, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
Expand All @@ -453,6 +437,10 @@ ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_consumer_class_consumeOne, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_consumer_class_basicConsume, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_ARG_INFO(0, flags)
ZEND_ARG_INFO(0, consumerKey)
ZEND_END_ARG_INFO()

/* {{{ amqp_functions[]
*
Expand Down Expand Up @@ -531,9 +519,7 @@ zend_function_entry amqp_queue_class_functions[] = {
PHP_ME(amqp_queue_class, bind, arginfo_amqp_queue_class_bind, ZEND_ACC_PUBLIC)

PHP_ME(amqp_queue_class, get, arginfo_amqp_queue_class_get, ZEND_ACC_PUBLIC)
PHP_ME(amqp_queue_class, basicConsume, arginfo_amqp_queue_class_basicConsume, ZEND_ACC_PUBLIC)
PHP_ME(amqp_queue_class, consume, arginfo_amqp_queue_class_consume, ZEND_ACC_PUBLIC)
PHP_ME(amqp_queue_class, select, arginfo_amqp_queue_class_select, ZEND_ACC_PUBLIC)
PHP_ME(amqp_queue_class, ack, arginfo_amqp_queue_class_ack, ZEND_ACC_PUBLIC)
PHP_ME(amqp_queue_class, nack, arginfo_amqp_queue_class_nack, ZEND_ACC_PUBLIC)
PHP_ME(amqp_queue_class, reject, arginfo_amqp_queue_class_reject, ZEND_ACC_PUBLIC)
Expand Down Expand Up @@ -602,8 +588,6 @@ zend_function_entry amqp_consumer_dispatcher_class_functions[] = {
PHP_ME(amqp_consumer_dispatcher_class, __construct, arginfo_amqp_consumer_dispatcher_class__construct, ZEND_ACC_PUBLIC)
PHP_ME(amqp_consumer_dispatcher_class, select, arginfo_amqp_consumer_dispatcher_class_select, ZEND_ACC_PUBLIC)
PHP_ME(amqp_consumer_dispatcher_class, hasConsumers, arginfo_amqp_consumer_dispatcher_class_hasConsumers, ZEND_ACC_PUBLIC)
PHP_ME(amqp_consumer_dispatcher_class, getConsumers, arginfo_amqp_consumer_dispatcher_class_getConsumers, ZEND_ACC_PUBLIC)
PHP_ME(amqp_consumer_dispatcher_class, rotateConsumers, arginfo_amqp_consumer_dispatcher_class_rotateConsumers, ZEND_ACC_PUBLIC)
PHP_ME(amqp_consumer_dispatcher_class, removeConsumer, arginfo_amqp_consumer_dispatcher_class_removeConsumer, ZEND_ACC_PUBLIC)

{NULL, NULL, NULL} /* Must be the last line in amqp_functions[] */
Expand All @@ -612,6 +596,7 @@ zend_function_entry amqp_consumer_dispatcher_class_functions[] = {
zend_function_entry amqp_consumer_class_functions[] = {
PHP_ME(amqp_consumer_class, __construct, arginfo_amqp_consumer_class__construct, ZEND_ACC_PUBLIC)
PHP_ME(amqp_consumer_class, getQueue, arginfo_amqp_consumer_class_getQueue, ZEND_ACC_PUBLIC)
PHP_ME(amqp_consumer_class, basicConsume, arginfo_amqp_consumer_class_basicConsume, ZEND_ACC_PUBLIC)
PHP_ME(amqp_consumer_class, consumeOne, arginfo_amqp_consumer_class_consumeOne, ZEND_ACC_PUBLIC)

{NULL, NULL, NULL} /* Must be the last line in amqp_functions[] */
Expand Down Expand Up @@ -857,6 +842,9 @@ PHP_MINIT_FUNCTION(amqp)
INIT_CLASS_ENTRY(ce, "AMQPExchangeException", NULL);
amqp_exchange_exception_class_entry = zend_register_internal_class_ex(&ce, amqp_exception_class_entry, NULL TSRMLS_CC);

INIT_CLASS_ENTRY(ce, "AMQPConsumerException", NULL);
amqp_consumer_exception_class_entry = zend_register_internal_class_ex(&ce, amqp_exception_class_entry, NULL TSRMLS_CC);

REGISTER_INI_ENTRIES();

REGISTER_LONG_CONSTANT("AMQP_NOPARAM", AMQP_NOPARAM, CONST_CS | CONST_PERSISTENT);
Expand Down
72 changes: 70 additions & 2 deletions amqp_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,70 @@ PHP_METHOD(amqp_consumer_class, getQueue)

consumer = (amqp_consumer_object *)zend_object_store_get_object(id TSRMLS_CC);
Z_ADDREF_P(consumer->queue);
RETURN_ZVAL(consumer->queue, 1, 0);
RETURN_ZVAL(consumer->queue, 0, 0);
}
/* }}} */


/* {{{ proto int AMQPConsumer::basicConsume([flags = <bitmask>, consumer_tag]);
start consuming on the queue
*/
PHP_METHOD(amqp_consumer_class, basicConsume)
{
zval *id;
amqp_consumer_object *consumer;
amqp_queue_object *queue;
amqp_channel_object *channel;
amqp_connection_object *connection;

zend_fcall_info fci;
zend_fcall_info_cache fci_cache;
int function_call_succeeded = 1;
int read;
amqp_table_t *arguments;

char *consumer_tag;
int consumer_tag_len = 0;
amqp_bytes_t consumer_tag_bytes;
long flags = INI_INT("amqp.auto_ack") ? AMQP_AUTOACK : AMQP_NOPARAM;

if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O|ls", &id, amqp_consumer_class_entry, &flags, &consumer_tag, &consumer_tag_len) == FAILURE) {
return;
}

consumer = (amqp_consumer_object *)zend_object_store_get_object(id TSRMLS_CC);

queue = (amqp_queue_object*) zend_object_store_get_object(consumer->queue TSRMLS_CC);

channel = AMQP_GET_CHANNEL(queue);
AMQP_VERIFY_CHANNEL(channel, "Could not get queue.");

connection = AMQP_GET_CONNECTION(channel);
AMQP_VERIFY_CONNECTION(connection, "Could not get queue.");

/* Setup the consume */
arguments = convert_zval_to_arguments(queue->arguments);

consumer_tag_bytes.bytes = (void *) consumer_tag;
consumer_tag_bytes.len = consumer_tag_len;

amqp_basic_consume(
connection->connection_resource->connection_state,
channel->channel_id,
amqp_cstring_bytes(queue->name),
consumer_tag_bytes, /* Consumer tag */
(AMQP_NOLOCAL & flags) ? 1 : 0, /* No local */
(AMQP_AUTOACK & flags) ? 1 : 0, /* no_ack, aka AUTOACK */
queue->exclusive,
*arguments
);

AMQP_EFREE_ARGUMENTS(arguments);

RETURN_TRUE;
}
/* }}} */

/* {{{ proto array AMQPConsumer::consumeOne(callback);
consume one message and return the value from the callback function
return boolean
Expand Down Expand Up @@ -181,7 +240,7 @@ PHP_METHOD(amqp_consumer_class, consumeOne)

connection = AMQP_GET_CONNECTION(channel);
AMQP_VERIFY_CONNECTION(connection, "Could not get connection.");

/* Initialize the message */
zval *message;
MAKE_STD_ZVAL(message);
Expand Down Expand Up @@ -238,3 +297,12 @@ PHP_METHOD(amqp_consumer_class, consumeOne)
}
/* }}} */


/*
*Local variables:
*tab-width: 4
*c-basic-offset: 4
*End:
*vim600: noet sw=4 ts=4 fdm=marker
*vim<600: noet sw=4 ts=4
*/
1 change: 1 addition & 0 deletions amqp_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ zend_object_value amqp_consumer_ctor(zend_class_entry *ce TSRMLS_DC);

PHP_METHOD(amqp_consumer_class, __construct);
PHP_METHOD(amqp_consumer_class, getQueue);
PHP_METHOD(amqp_consumer_class, basicConsume);
PHP_METHOD(amqp_consumer_class, consumeOne);


Expand Down
61 changes: 15 additions & 46 deletions amqp_consumer_dispatcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ zend_object_value amqp_consumer_dispatcher_ctor(zend_class_entry *ce TSRMLS_DC)
return new_value;
}



/*
* Verify that the given zval is a consumer and attempt to return the connection object
* If NULL is returned, then an exception will be on the stack, so callers should return immediately
Expand All @@ -141,12 +139,12 @@ amqp_connection_object *getConsumerConnection(zval *z_consumer)
zval *z_queue = NULL;

if(!z_consumer) {
zend_throw_exception(amqp_queue_exception_class_entry, "Consumer expected", 0 TSRMLS_CC);
zend_throw_exception(amqp_consumer_exception_class_entry, "Consumer expected", 0 TSRMLS_CC);
return NULL;
}

if(Z_TYPE_P(z_consumer) != IS_OBJECT) {
zend_throw_exception(amqp_queue_exception_class_entry, "AMQPConsumer expected", 0 TSRMLS_CC);
zend_throw_exception(amqp_consumer_exception_class_entry, "AMQPConsumer expected", 0 TSRMLS_CC);
return NULL;
}

Expand All @@ -158,7 +156,6 @@ amqp_connection_object *getConsumerConnection(zval *z_consumer)
}

if (Z_TYPE_P(z_queue) != IS_OBJECT) {
php_printf("Type of queue is %d", Z_TYPE_P(z_queue));
zend_throw_exception(amqp_queue_exception_class_entry, "Consumer not bound to a queue", 0 TSRMLS_CC);
return NULL;
}
Expand Down Expand Up @@ -188,6 +185,7 @@ void rotate_consumers(amqp_consumer_dispatcher_object *consumer_dispatcher)
zend_call_method(NULL, NULL, NULL, "array_shift", sizeof("array_shift")-1, &retval1, 1, arrayRef, NULL);
zend_call_method(NULL, NULL, NULL, "array_push", sizeof("array_push")-1, &retval2, 2, arrayRef, retval1);

FREE_ZVAL(arrayRef);
zval_ptr_dtor(&retval1);
zval_ptr_dtor(&retval2);
zval_delref_p(arrayRef);
Expand Down Expand Up @@ -221,24 +219,6 @@ PHP_METHOD(amqp_consumer_dispatcher_class, __construct)
/* }}} */


/* {{{ proto AMQPConsumerDispatcher::rotateConsumers()
*/
PHP_METHOD(amqp_consumer_dispatcher_class, rotateConsumers)
{
zval *id;
amqp_consumer_dispatcher_object *consumer_dispatcher;

/* Parse out the method parameters */
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O", &id, amqp_consumer_dispatcher_class_entry) == FAILURE) {
return;
}

consumer_dispatcher = (amqp_consumer_dispatcher_object *)zend_object_store_get_object(id TSRMLS_CC);
rotate_consumers(consumer_dispatcher);
}
/* }}} */


/* {{{ proto int AMQPConsumerDispatcher::select([long timeout = 0]]);
select
@return NULL if no consumer was selected
Expand Down Expand Up @@ -300,7 +280,7 @@ PHP_METHOD(amqp_consumer_dispatcher_class, select)
}

if(amqp_data_in_buffer(connection->connection_resource->connection_state)) {
RETURN_ZVAL(z_consumer, 1, 0);
RETURN_ZVAL(z_consumer, 0, 0);
}

int fd = connection->connection_resource->fd;
Expand Down Expand Up @@ -335,7 +315,7 @@ PHP_METHOD(amqp_consumer_dispatcher_class, select)

int fd = connection->connection_resource->fd;
if(FD_ISSET(fd, &read_fd)) {
RETURN_ZVAL(z_consumer, 1, 0);
RETURN_ZVAL(z_consumer, 0, 0);
}
}

Expand Down Expand Up @@ -364,26 +344,6 @@ PHP_METHOD(amqp_consumer_dispatcher_class, hasConsumers)
}
/* }}} */

/* {{{ proto AMQPConsumerDispatcher::getConsumers
Return the array of consumers */
PHP_METHOD(amqp_consumer_dispatcher_class, getConsumers)
{
zval *id;
amqp_consumer_dispatcher_object *consumer_dispatcher;

if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O", &id, amqp_consumer_dispatcher_class_entry) == FAILURE) {
return;
}

consumer_dispatcher = (amqp_consumer_dispatcher_object *)zend_object_store_get_object(id TSRMLS_CC);

*return_value = *consumer_dispatcher->consumers;
zval_copy_ctor(return_value);

/* Increment the ref count */
Z_ADDREF_P(consumer_dispatcher->consumers);
}
/* }}} */

/* {{{ proto AMQPConsumerDispatcher::removeConsumer(AMQPConsumer consumer);
create Exchange */
Expand All @@ -403,7 +363,7 @@ PHP_METHOD(amqp_consumer_dispatcher_class, removeConsumer)
}

if (!instanceof_function(Z_OBJCE_P(consumerObj), amqp_consumer_class_entry TSRMLS_CC)) {
zend_throw_exception(amqp_exchange_exception_class_entry, "The first parameter must be an instance of AMQPConsumer.", 0 TSRMLS_CC);
zend_throw_exception(amqp_consumer_exception_class_entry, "The first parameter must be an instance of AMQPConsumer.", 0 TSRMLS_CC);
return;
}

Expand All @@ -428,3 +388,12 @@ PHP_METHOD(amqp_consumer_dispatcher_class, removeConsumer)
}
/* }}} */


/*
*Local variables:
*tab-width: 4
*c-basic-offset: 4
*End:
*vim600: noet sw=4 ts=4 fdm=marker
*vim<600: noet sw=4 ts=4
*/
2 changes: 0 additions & 2 deletions amqp_consumer_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ zend_object_value amqp_consumer_dispatcher_ctor(zend_class_entry *ce TSRMLS_DC);
PHP_METHOD(amqp_consumer_dispatcher_class, __construct);
PHP_METHOD(amqp_consumer_dispatcher_class, select);
PHP_METHOD(amqp_consumer_dispatcher_class, hasConsumers);
PHP_METHOD(amqp_consumer_dispatcher_class, getConsumers);
PHP_METHOD(amqp_consumer_dispatcher_class, rotateConsumers);
PHP_METHOD(amqp_consumer_dispatcher_class, removeConsumer);


Expand Down
Loading

0 comments on commit 4a7d9f7

Please sign in to comment.