Skip to content

Commit

Permalink
Finalized configuration property framework.
Browse files Browse the repository at this point in the history
- rd_kafka_*conf_t structs are now private.
- Use rd_kafka_*conf_set() to populate.
- Each property is now properly documented in CONFIGURE.md (autogenerated)
  • Loading branch information
edenhill committed Oct 3, 2013
1 parent 887f5c3 commit f262590
Show file tree
Hide file tree
Showing 11 changed files with 711 additions and 560 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ LDFLAGS+=-g -fPIC

all: libs

libs: $(LIBNAME).so.$(LIBVER) $(LIBNAME).a
libs: $(LIBNAME).so.$(LIBVER) $(LIBNAME).a CONFIGURATION.md

%.o: %.c
$(CC) -MD -MP $(CFLAGS) -c $<
Expand All @@ -53,6 +53,13 @@ $(LIBNAME).so.$(LIBVER): $(OBJS)
$(LIBNAME).a: $(OBJS)
$(AR) rcs $@ $(OBJS)


CONFIGURATION.md: rdkafka.h examples
examples/rdkafka_performance -X list > CONFIGURATION.md.tmp
cmp CONFIGURATION.md CONFIGURATION.md.tmp || \
mv CONFIGURATION.md.tmp CONFIGURATION.md
rm -f CONFIGURATION.md.tmp

examples: .PHONY
make -C $@

Expand Down
14 changes: 7 additions & 7 deletions rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,

/* Construct a client id if none is given. */
if (!rk->rk_conf.clientid)
rk->rk_conf.clientid = strdup("default");
rk->rk_conf.clientid = strdup("rdkafka");

snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
rk->rk_conf.clientid, rd_kafka_type2str(rk->rk_type), rkid++);
Expand Down Expand Up @@ -897,12 +897,12 @@ static void rd_kafka_poll_cb (rd_kafka_op_t *rko, void *opaque) {

dcnt++;

rk->rk_conf.producer.dr_cb(rk,
rkm->rkm_payload,
rkm->rkm_len,
rko->rko_err,
rk->rk_conf.opaque,
rkm->rkm_opaque);
rk->rk_conf.dr_cb(rk,
rkm->rkm_payload,
rkm->rkm_len,
rko->rko_err,
rk->rk_conf.opaque,
rkm->rkm_opaque);

rd_kafka_msg_destroy(rk, rkm);
}
Expand Down
148 changes: 105 additions & 43 deletions rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

/**
* Apache Kafka consumer & producer
*
* rdkafka.h contains the public API for librdkafka.
* The API isdocumented in this file as comments prefixing the function, type,
* enum, define, etc.
*/

#pragma once
Expand All @@ -47,25 +51,30 @@
*/
#define RD_KAFKA_VERSION 0x00080000

#define RD_KAFKA_TOPIC_MAXLEN 256

/**
* rd_kafka_t handle type
*/
typedef enum {
RD_KAFKA_PRODUCER,
RD_KAFKA_CONSUMER,
} rd_kafka_type_t;


/* Supported debug contexts (CSV "debug" configuration property) */
/**
* Supported debug contexts (CSV "debug" configuration property)
*/
#define RD_KAFKA_DEBUG_CONTEXTS \
"all,generic,broker,topic,metadata,producer,queue,msg"



/* Private types to provide ABI compatibility */
typedef struct rd_kafka_s rd_kafka_t;
typedef struct rd_kafka_topic_s rd_kafka_topic_t;
typedef struct rd_kafka_conf_s rd_kafka_conf_t;
typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;


/**
* Kafka protocol error codes (version 0.8)
*/
Expand Down Expand Up @@ -113,50 +122,52 @@ const char *rd_kafka_err2str (rd_kafka_t *rk, rd_kafka_resp_err_t err);


/*******************************************************************
* Partitioners provided by rdkafka *
* *
* Main configuration property interface *
* *
*******************************************************************/

/**
* Random partitioner.
* This is the default partitioner.
*
* Returns a random partition between 0 and 'partition_cnt'-1.
*
* Configuration result type
*/
int32_t rd_kafka_msg_partitioner_random (const void *key,
size_t keylen,
int32_t partition_cnt,
void *opaque, void *msg_opaque);

typedef enum {
RD_KAFKA_CONF_UNKNOWN = -2, /* Unknown configuration name. */
RD_KAFKA_CONF_INVALID = -1, /* Invalid configuration value. */
RD_KAFKA_CONF_OK = 0, /* Configuration okay */
} rd_kafka_conf_res_t;


/**
* The default configuration.
* Create configuration object.
* When providing your own configuration to the rd_kafka_*_new_*() calls
* the rd_kafka_conf_t objects needs to be created with this function
* which will set up the defaults.
* I.e.:
*
* rd_kafka_conf_t *myconf;
* rd_kafka_conf_res_t res;
*
* myconf = rd_kafka_conf_new();
* rd_kafka_conf_set(myconf, "socket.timeout.ms", "600");
* res = rd_kafka_conf_set(myconf, "socket.timeout.ms", "600",
* errstr, sizeof(errstr));
* if (res != RD_KAFKA_CONF_OK)
* die("%s\n", errstr);
*
* rk = rd_kafka_new_consumer(, ... myconf);
* rk = rd_kafka_new(..., myconf);
*
* Please see CONFIGURATION.md for the default settings or use
* `rd_kafka_conf_properties_show()` to provide the information at runtime.
*
* Please see rdkafka_defaultconf.c for the default settings.
* The properties are identical to the Apache Kafka configuration properties
* whenever possible.
*/
rd_kafka_conf_t *rd_kafka_conf_new (void);

typedef enum {
RD_KAFKA_CONF_UNKNOWN = -2, /* Unknown configuration name. */
RD_KAFKA_CONF_INVALID = -1, /* Invalid configuration value. */
RD_KAFKA_CONF_OK = 0, /* Configuration okay */
} rd_kafka_conf_res_t;


/**
* Sets a single rd_kafka_conf_t value by property name.
* 'conf' must have been previously set up with rd_kafka_conf_new().
* Sets a configuration property.
* 'conf' must have been previously created with rd_kafka_conf_new().
*
* Returns rd_kafka_conf_res_t to indicate success or failure.
* In case of failure 'errstr' is updated to contain a human readable
Expand Down Expand Up @@ -191,29 +202,25 @@ void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf,


/**
* Sets the application's opaque pointer that will be passed to all callbacks
* as the 'opaque' argument.
*
* FIXME: exception consume_callback
* Sets the application's opaque pointer that will be passed to `dr_cb`
* and `error_cb_` callbacks as the 'opaque' argument.
*/
void rd_kafka_conf_set_opaque (rd_kafka_conf_t *conf, void *opaque);



/**
* Consumer:
* Set consume callback in provided conf object.
* Prints a table to 'fp' of all supported configuration properties,
* their default values as well as a description.
*/
void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
void (*consume_cb) (rd_kafka_t *rk,
rd_kafka_topic_t *rkt,
int32_t partition,
void *payload,
size_t len,
void *key,
size_t key_len,
int64_t offset,
void *opaque));
void rd_kafka_conf_properties_show (FILE *fp);



/*******************************************************************
* *
* Topic configuration property interface *
* *
*******************************************************************/

/**
* Create topic configuration object
Expand All @@ -225,7 +232,7 @@ rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void);
/**
* Sets a single rd_kafka_topic_conf_t value by property name.
* 'topic_conf' should have been previously set up
* with rd_kafka_topic_conf_new().
* with `rd_kafka_topic_conf_new()`.
*
* Returns rd_kafka_conf_res_t to indicate success or failure.
*/
Expand All @@ -234,10 +241,65 @@ rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,
const char *value,
char *errstr, size_t errstr_size);

/**
* Sets the application's opaque pointer that will be passed to all topic
* callbacks as the 'rkt_opaque' argument.
*/
void rd_kafka_topic_conf_set_opaque (rd_kafka_topic_conf_t *conf, void *opaque);


/**
* Producer:
* Set partitioner callback in provided topic conf object.
*
* The partitioner may be called in any thread at any time,
* it may be called multiple times for the same message/key.
*
* Partitioner function constraints:
* - MUST NOT call any rd_kafka_*() functions except:
* rd_kafka_topic_partition_available()
* - MUST NOT block or execute for prolonged periods of time.
* - MUST return a value between 0 and partition_cnt-1, or the
* special RD_KAFKA_PARTITION_UA value if partitioning
* could not be performed.
*/
void
rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
int32_t (*partitioner) (
const rd_kafka_topic_t *rkt,
const void *keydata,
size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque));

/**
* Check if partition is available (has a leader broker).
*
* Returns 1 if the partition is available, else 0.
*
* NOTE: This function must only be called from inside a partitioner function.
*/
int rd_kafka_topic_partition_available (const rd_kafka_topic_t *rkt,
int32_t partition);


/*******************************************************************
* *
* Partitioners provided by rdkafka *
* *
*******************************************************************/

/**
* Random partitioner.
* This is the default partitioner.
*
* Returns a random partition between 0 and 'partition_cnt'-1.
*/
int32_t rd_kafka_msg_partitioner_random (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *opaque, void *msg_opaque);



Expand Down
30 changes: 15 additions & 15 deletions rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ static int rd_kafka_broker_resolve (rd_kafka_broker_t *rkb) {
const char *errstr;

if (rkb->rkb_rsal &&
rkb->rkb_t_rsal_last + rkb->rkb_rk->rk_conf.broker_addr_lifetime <
rkb->rkb_t_rsal_last + rkb->rkb_rk->rk_conf.broker_addr_ttl <
time(NULL)) {
/* Address list has expired. */
rd_sockaddr_list_destroy(rkb->rkb_rsal);
Expand Down Expand Up @@ -1038,7 +1038,7 @@ static int rd_kafka_req_response (rd_kafka_broker_t *rkb,
if (unlikely(!(req =
rd_kafka_waitresp_find(rkb,
rkbuf->rkbuf_reshdr.CorrId)))) {
/* FIXME: unknown response */
/* FIXME: unknown response. probably due to request timeout */
rd_rkb_dbg(rkb, BROKER, "RESPONSE",
"Unknown response for CorrId %"PRId32,
rkbuf->rkbuf_reshdr.CorrId);
Expand Down Expand Up @@ -1399,7 +1399,7 @@ static void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb,
rkb->rkb_c.tx_retries++;

rkbuf->rkbuf_ts_retry = rd_clock() +
(rkb->rkb_rk->rk_conf.producer.retry_backoff_ms * 1000);
(rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000);
/* Reset send offset */
rkbuf->rkbuf_of = 0;

Expand Down Expand Up @@ -1427,12 +1427,12 @@ static void rd_kafka_broker_retry_bufs_move (rd_kafka_broker_t *rkb) {


/**
* Propagate delivery report for message queue.
* Propagate delivery report for entire message queue.
*/
static void rd_kafka_dr_msgq (rd_kafka_t *rk,
rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err) {
void rd_kafka_dr_msgq (rd_kafka_t *rk,
rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err) {

if (rk->rk_conf.producer.dr_cb) {
if (rk->rk_conf.dr_cb) {
/* Pass all messages to application thread in one op. */
rd_kafka_op_t *rko;

Expand Down Expand Up @@ -1480,7 +1480,7 @@ static void rd_kafka_produce_msgset_reply (rd_kafka_broker_t *rkb,
case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
/* Try again */
if (++request->rkbuf_retries <
rkb->rkb_rk->rk_conf.producer.max_retries) {
rkb->rkb_rk->rk_conf.max_retries) {

if (reply)
rd_kafka_buf_destroy(reply);
Expand Down Expand Up @@ -1584,7 +1584,7 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
if (rktp->rktp_xmit_msgq.rkmq_msg_cnt > 0)
assert(TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs));
msgcnt = RD_MIN(rktp->rktp_xmit_msgq.rkmq_msg_cnt,
rkb->rkb_rk->rk_conf.producer.batch_num_messages);
rkb->rkb_rk->rk_conf.batch_num_messages);
assert(msgcnt > 0);
iovcnt = 3 + (4 * msgcnt);

Expand Down Expand Up @@ -1723,7 +1723,7 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
}

/* Compress the messages */
if (rkb->rkb_rk->rk_conf.producer.compression_codec) {
if (rkb->rkb_rk->rk_conf.compression_codec) {
int siovlen = 1;
size_t coutlen;
int r;
Expand All @@ -1742,7 +1742,7 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
z_stream strm;
int i;

switch (rkb->rkb_rk->rk_conf.producer.compression_codec) {
switch (rkb->rkb_rk->rk_conf.compression_codec) {
case RD_KAFKA_COMPRESSION_NONE:
abort(); /* unreachable */
break;
Expand Down Expand Up @@ -1884,7 +1884,7 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
msghdr2->MessageSize = htonl(4+1+1+4+4 + coutlen);
msghdr2->MagicByte = 0;
msghdr2->Attributes = rkb->rkb_rk->rk_conf.
producer.compression_codec & 0x7;
compression_codec & 0x7;
msghdr2->Key_len = htonl(-1);
msghdr2->Value_len = htonl(coutlen);
msghdr2->Crc = rd_crc32_init();
Expand Down Expand Up @@ -1993,7 +1993,7 @@ static void rd_kafka_broker_io_serve (rd_kafka_broker_t *rkb) {
rkb->rkb_pfd.events &= ~POLLOUT;

if (poll(&rkb->rkb_pfd, 1,
rkb->rkb_rk->rk_conf.producer.buffering_max_ms) <= 0)
rkb->rkb_rk->rk_conf.buffering_max_ms) <= 0)
return;

if (rkb->rkb_pfd.revents & POLLIN)
Expand Down Expand Up @@ -2079,10 +2079,10 @@ static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) {
* our waiting to queue.buffering.max.ms
* and batch.num.messages. */
if (rktp->rktp_ts_last_xmit +
(rkb->rkb_rk->rk_conf.producer.
(rkb->rkb_rk->rk_conf.
buffering_max_ms * 1000) > now &&
rktp->rktp_xmit_msgq.rkmq_msg_cnt <
rkb->rkb_rk->rk_conf.producer.
rkb->rkb_rk->rk_conf.
batch_num_messages) {
/* Wait for more messages */
continue;
Expand Down
Loading

0 comments on commit f262590

Please sign in to comment.