Skip to content

Commit

Permalink
Improvements and fixes for performance of PUSH/PULL in local_thr and …
Browse files Browse the repository at this point in the history
…remote_thr.

- option to set number of zmq threads on the command line for local_thr
- option to set number of zmq_threads and workers in remote_thr
- option to set SND/RCV buffer sizes on command line
- option to set whether to PUSH/PULL on command line
- option to set to use zmq_recv or zmq_msg for transfer on command line
- better timing function
- corrected and improved throughput reporting
- HWM and DELAY socket options
  • Loading branch information
ambitslix committed Feb 26, 2013
1 parent deb9773 commit a22714d
Show file tree
Hide file tree
Showing 3 changed files with 466 additions and 87 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ AJ Lewis <[email protected]>
Alexej Lotz <[email protected]>
Andrew Thompson <[email protected]>
Asko Kauppi <[email protected]>
Attila Mark <[email protected]>
Barak Amar <[email protected]>
Ben Gray <[email protected]>
Bernd Prager <[email protected]>
Expand Down
253 changes: 211 additions & 42 deletions perf/local_thr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,115 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include "zmq.h"
#include "zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <limits.h>
#include <sys/time.h>

#define ZMSG 1
#define DATA 0

typedef struct US_TIMER US_TIMER;

struct US_TIMER{

struct timeval time_was;
struct timeval time_now;
};
/* Records the current timer state
*/
void tm_init( US_TIMER *t){

if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

t->time_was = t->time_now;

}

/* Returns the time passed in microsecond precision in seconds since last init
of timer.
*/
float tm_secs( US_TIMER *t){

register float seconds;

if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) +
(((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0));

t->time_was = t->time_now;

return( seconds);
}

const char *bind_to;
int message_count = 1000;
int message_size = 1024;
int threads = 1;
int workers = 1;
int sndbuflen = 128*256;
int rcvbuflen = 128*256;
int flow = ZMQ_PULL;
int rec = DATA;

void my_free (void *data, void *hint)
{
// free (data);
}

int main (int argc, char *argv [])
{
if (argc != 4) {
printf ("usage: local_thr <bind-to> <message-size> <message-count>\n");
US_TIMER timer;
void *ctx;
void *s;
int rc;
int i;
void *buf = NULL;

if (argc != 9) {
printf ("usage: local_thr <bind-to> <message-size> <message-count> <SND buffer> <RCV buffer> <flow (PUSH/PULL)> <rec (ZMSG/DATA)> <zmq-threads>\n");
return 1;
}
const char *bind_to = argv [1];
size_t message_size = atoi (argv [2]);
int message_count = atoi (argv [3]);

void *ctx = zmq_init (1);
bind_to = argv [1];
message_size = atoi (argv [2]);
message_count = atoi (argv [3]);
sndbuflen = atoi (argv [4]);
rcvbuflen = atoi (argv [5]);
if( !strcmp( argv [6], "PUSH")){
flow = ZMQ_PUSH;
}
if( !strcmp( argv [6], "PULL")){
flow = ZMQ_PULL;
}
if( !strcmp( argv [7], "ZMSG")){
rec = ZMSG;
}
if( !strcmp( argv [7], "DATA")){
rec = DATA;
}
threads = atoi (argv [8]);

if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}

ctx = zmq_ctx_new ();
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno));
return -1;
}

rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads);
if (rc) {
printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno));
return -1;
}

void *s = zmq_socket (ctx, ZMQ_PULL);
s = zmq_socket (ctx, flow);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
Expand All @@ -49,61 +136,141 @@ int main (int argc, char *argv [])
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.

int rc = zmq_bind (s, bind_to);
size_t rcvbuflenlen = (size_t)sizeof rcvbuflen;
size_t sndbuflenlen = (size_t)sizeof sndbuflen;

rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen);
if (rc != 0) {
printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}

zmq_msg_t msg;
rc = zmq_msg_init (&msg);
sndbuflen = 1;
rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}

rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
sndbuflen = 2;
rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");

sndbuflen = 2;
rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}

void *watch = zmq_stopwatch_start ();

int i;
for (i = 0; i != message_count - 1; i++) {
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
if (rc != 0) {
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
return -1;
}

printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024);

printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS));

rc = zmq_bind (s, bind_to);
if (rc != 0) {
printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
return -1;
}

if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}

printf("%sING %s...\n", flow == ZMQ_PUSH ? "PUSH":"PULL", rec ? "ZMQ_MSG":"DATA");

tm_init( &timer);

if( flow == ZMQ_PULL){

if( rec == ZMSG){

zmq_msg_t msg;

rc = zmq_msg_init (&msg);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
for (i = 0; i != message_count; i++) {
rc = zmq_msg_recv (&msg, s, 0);
if (rc < 0) {
printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
return -1;
}
}
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}}

else{

for (i = 0; i != message_count; i++) {
rc = zmq_recv( s, buf, message_size, 0);
if (rc < 0) {
printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
return -1;
}
}}
}
else{

unsigned long elapsed = zmq_stopwatch_stop (watch);
if (elapsed == 0)
elapsed = 1;
if( rec == ZMSG){

rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
zmq_msg_t msg;

for (i = 0; i != message_count; i++) {

rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL);
if (rc != 0) {
printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno));
return -1;
}

rc = zmq_msg_send( &msg, s, 0);
if (rc < 0) {
printf ("error in zmq_send: %s\n", zmq_strerror (errno));
return -1;
}
}}


else{
for (i = 0; i != message_count; i++){
rc = zmq_send( s, buf, message_size, 0);
if (rc < 0) {
printf ("error in zmq_send: %s\n", zmq_strerror (errno));
return -1;
}
}}
}

double throughput = ((double) message_count / (double) elapsed) * 1000000;
double megabits = (double) (throughput * message_size * 8) / 1000000;
float secs = tm_secs( &timer);
float total = (((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0);

printf ("Message size: %d KBytes, time: %f secs\n", (int) message_size/1024, secs);
printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs);

printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);

rc = zmq_close (s);
if (rc != 0) {
Expand All @@ -117,5 +284,7 @@ int main (int argc, char *argv [])
return -1;
}

if( buf) free( buf);

return 0;
}
Loading

0 comments on commit a22714d

Please sign in to comment.