Skip to content

Commit

Permalink
esnet#382 resolution with changes info
Browse files Browse the repository at this point in the history
  • Loading branch information
davidBar-On committed Nov 11, 2020
1 parent ead2f1a commit 0651353
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ struct iperf_settings
iperf_size_t blocks; /* number of blocks (packets) to send */
char unit_format; /* -f */
int num_ostreams; /* SCTP initmsg settings */
int wait_all_received; /* maximum time in seconds to wait for all data to be receieved */
#if defined(HAVE_SSL)
char *authtoken; /* Authentication token */
char *client_username;
Expand Down
23 changes: 23 additions & 0 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,7 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv)
{"file", required_argument, NULL, 'F'},
{"repeating-payload", no_argument, NULL, OPT_REPEATING_PAYLOAD},
{"timestamps", optional_argument, NULL, OPT_TIMESTAMPS},
{"wait-all-received", required_argument, NULL, OPT_WAIT_ALL_RECEIVED},
#if defined(HAVE_CPU_AFFINITY)
{"affinity", required_argument, NULL, 'A'},
#endif /* HAVE_CPU_AFFINITY */
Expand Down Expand Up @@ -1236,6 +1237,17 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv)
iperf_set_test_timestamp_format(test, TIMESTAMP_FORMAT);
}
break;
/* >>>>>> #382 ADD */
case OPT_WAIT_ALL_RECEIVED:
test->settings->wait_all_received = atoi(optarg);
if (test->settings->wait_all_received < 0) {
i_errno = IEWAITRECEIVED;
return -1;
}
// Force readable payload to allow detecting last packet indication
client_flag = 1;
break;
/* <<<<<<< #382 ADD */
case 'O':
test->omit = atoi(optarg);
if (test->omit < 0 || test->omit > 60) {
Expand Down Expand Up @@ -1439,6 +1451,12 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv)
if ((test->settings->bytes != 0 || test->settings->blocks != 0) && ! duration_flag)
test->duration = 0;

/* >>>>>> #382 ADD */
// If not-udp, force readable payload to allow detecting last packet indication
if (test->protocol->id != Pudp && test->settings->wait_all_received > 0)
test->repeating_payload = 1;
/* <<<<<<< #382 ADD */

/* Disallow specifying multiple test end conditions. The code actually
** works just fine without this prohibition. As soon as any one of the
** three possible end conditions is met, the test ends. So this check
Expand Down Expand Up @@ -1892,6 +1910,7 @@ send_parameters(struct iperf_test *test)
cJSON_AddNumberToObject(j, "udp_counters_64bit", iperf_get_test_udp_counters_64bit(test));
if (test->repeating_payload)
cJSON_AddNumberToObject(j, "repeating_payload", test->repeating_payload);
cJSON_AddNumberToObject(j, "wait_all_received", test->settings->wait_all_received); /* >>>> #382 <<<< */
#if defined(HAVE_SSL)
/* Send authentication parameters */
if (test->settings->client_username && test->settings->client_password && test->settings->client_rsa_pubkey){
Expand Down Expand Up @@ -2000,6 +2019,8 @@ get_parameters(struct iperf_test *test)
iperf_set_test_udp_counters_64bit(test, 1);
if ((j_p = cJSON_GetObjectItem(j, "repeating_payload")) != NULL)
test->repeating_payload = 1;
if ((j_p = cJSON_GetObjectItem(j, "wait_all_received")) != NULL) /* >>>> #382 <<<< */
test->settings->wait_all_received = j_p->valueint;
#if defined(HAVE_SSL)
if ((j_p = cJSON_GetObjectItem(j, "authtoken")) != NULL)
test->settings->authtoken = strdup(j_p->valuestring);
Expand Down Expand Up @@ -2510,6 +2531,7 @@ iperf_defaults(struct iperf_test *testp)
testp->settings->bytes = 0;
testp->settings->blocks = 0;
testp->settings->connect_timeout = -1;
testp->settings->wait_all_received = 0; // Default must be zero to allow randomized payload // >>> #382 <<<
memset(testp->cookie, 0, COOKIE_SIZE);

testp->multisend = 10; /* arbitrary */
Expand Down Expand Up @@ -3921,6 +3943,7 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)
fill_with_repeating_pattern(sp->buffer, test->settings->blksize);
else
ret = readentropy(sp->buffer, test->settings->blksize);
*sp->buffer = NOT_LAST_PACKET; /* Changed when last packet is sent */ /* >>>> #382 <<<<< */

if ((ret < 0) || (iperf_init_stream(sp, test) < 0)) {
close(sp->buffer_fd);
Expand Down
10 changes: 10 additions & 0 deletions src/iperf_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ typedef uint64_t iperf_size_t;
#define OPT_BIDIRECTIONAL 20
#define OPT_SERVER_BITRATE_LIMIT 21
#define OPT_TIMESTAMPS 22
#define OPT_WAIT_ALL_RECEIVED 23

/* states */
#define TEST_START 1
Expand All @@ -97,9 +98,17 @@ typedef uint64_t iperf_size_t;
#define DISPLAY_RESULTS 14
#define IPERF_START 15
#define IPERF_DONE 16
#define TEST_WAIT_DATA_RECEIVED 17
#define ACCESS_DENIED (-1)
#define SERVER_ERROR (-2)

/* >>>>> #382 ADD */
/* Special Packet Numbers and end of test constants */
#define TEST_END_PACKET_NUMBER 0
#define NOT_LAST_PACKET '0'
#define LAST_PACKET '\0'
/* <<<<< #382 ADD */

/* Getter routines for some fields inside iperf_test. */
int iperf_get_verbose( struct iperf_test* ipt );
int iperf_get_control_socket( struct iperf_test* ipt );
Expand Down Expand Up @@ -361,6 +370,7 @@ enum {
IEBADPORT = 26, // Bad port number
IETOTALRATE = 27, // Total required bandwidth is larger than server's limit
IETOTALINTERVAL = 28, // Invalid time interval for calculating average data rate
IEWAITRECEIVED = 29, // Bogus value for --wait-all_received
/* Test errors */
IENEWTEST = 100, // Unable to create a new test (check perror)
IEINITTEST = 101, // Test initialization failed (check perror)
Expand Down
75 changes: 63 additions & 12 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,15 @@ iperf_handle_message_client(struct iperf_test *test)
test->on_test_finish(test);
iperf_client_end(test);
break;
/* >>>>>>>> #382 ADD */
case TEST_END:
// Receiver Server indicate that last packet was received
test->state = TEST_END;
if (test->debug) {
iperf_printf(test,"iperf_handle_message_client: Server indicatated that last packet was received\n");
}
break;
/* <<<<<<<< #382 ADD */
case IPERF_DONE:
break;
case SERVER_TERMINATE:
Expand Down Expand Up @@ -561,19 +570,61 @@ iperf_run_client(struct iperf_test * test)
(test->settings->blocks != 0 && (test->blocks_sent >= test->settings->blocks ||
test->blocks_received >= test->settings->blocks)))) {

// Unset non-blocking for non-UDP tests
if (test->protocol->id != Pudp) {
SLIST_FOREACH(sp, &test->streams, streams) {
setnonblocking(sp->socket, 0);
}
}
// Unset non-blocking for non-UDP tests
if (test->protocol->id != Pudp) {
SLIST_FOREACH(sp, &test->streams, streams) {
setnonblocking(sp->socket, 0);
}
}

/* Yes, done! Send TEST_END. */
test->done = 1;
cpu_util(test->cpu_util);
test->stats_callback(test);
if (iperf_set_send_state(test, TEST_END) != 0)
goto cleanup_and_fail;
/* >>>>>>> #382 ADD ***************************/
if (test->mode == SENDER && !test->zerocopy && test->settings->wait_all_received > 0) {
// If client is sender and not sending from a file -
// if required, send packt to each stream indicating end of data.
test->state = TEST_WAIT_DATA_RECEIVED;
if (test->debug) {
iperf_printf(test,"iperf_run_client: before sending last packet, test->state=%d\n", test->state);
}
// Send last packet - retry few times if fails (best effort)
register struct iperf_stream *sp;
int i;
SLIST_FOREACH(sp, &test->streams, streams) {
// If UDP send last packet few times for redundancy, otherwise send once
if (test->protocol->id == Pudp) {
iperf_printf(test,"iperf_run_client: sending end packets\n");
for (i = 0; i < 3; sp->snd(sp), i++);
} else {
iperf_printf(test,"iperf_run_client: sending tcp end packets\n");
for (i = 0; i < 3 && sp->snd(sp) < 0; i++);
}
}
// Wait until the server received the last packet (or timeout if takes too long)
for (i = 0; test->state != TEST_END && i < test->settings->wait_all_received; i++) {
if (test->debug) {
iperf_printf(test,"iperf_run_client: In loop of waiting for Server indicatation that last packet was received\n");
}
FD_ZERO(&read_set);
FD_SET(test->ctrl_sck, &read_set);
iperf_time_now(&now);
timeout = tmr_timeout(&now);
result = select(test->max_fd + 1, &read_set, NULL, NULL, timeout);
if (result > 0) {
iperf_handle_message_client(test);
if (test->state == TEST_END)
break;
FD_CLR(test->ctrl_sck, &read_set);
}
sleep(1);
}
}
/* <<<<<<<< #382 ADD ***************************/

/* Yes, done! Send TEST_END. */
test->done = 1;
cpu_util(test->cpu_util);
test->stats_callback(test);
if (iperf_set_send_state(test, TEST_END) != 0)
goto cleanup_and_fail;
}
}
// If we're in reverse mode, continue draining the data
Expand Down
2 changes: 2 additions & 0 deletions src/iperf_locale.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ const char usage_longstr[] = "Usage: iperf3 [-s|-c host] [options]\n"
" --udp-counters-64bit use 64-bit counters in UDP test packets\n"
" --repeating-payload use repeating pattern in payload, instead of\n"
" randomized payload (like in iperf2)\n"
" --wait-all-received # maximum time in seconds to wait for receiver to receive all data\n"
" (default 0 - no wait); implies `--repeating-payload` for non-udp\n"
#if defined(HAVE_SSL)
" --username username for authentication\n"
" --rsa-public-key-path path to the RSA public key used to encrypt\n"
Expand Down
40 changes: 36 additions & 4 deletions src/iperf_sctp.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,27 @@ iperf_sctp_recv(struct iperf_stream *sp)

/* Only count bytes received while we're in the correct state. */
if (sp->test->state == TEST_RUNNING) {
sp->result->bytes_received += r;
sp->result->bytes_received_this_interval += r;
/* >>>>> #382 ADD */
// If sender waits for receiver to receive last packet -
// check whether test-end packet received and change state accordingly
int i = r;
if (sp->test->settings->wait_all_received > 0) {
for (i = 0; i < r; i++) {
if (sp->buffer[i] == LAST_PACKET)
break;
}
}
if (i < r) { // Last packet identified
if (sp->test->debug) {
iperf_printf(sp->test, "iperf_sctp_recv: Received last packet\n");
}
if (iperf_set_send_state(sp->test, TEST_END) != 0)
return -1;
} else {
/* <<<<<<<< #382 ADD */
sp->result->bytes_received += r;
sp->result->bytes_received_this_interval += r;
}
}
else {
if (sp->test->debug)
Expand All @@ -92,12 +111,25 @@ iperf_sctp_send(struct iperf_stream *sp)
#if defined(HAVE_SCTP_H)
int r;

/* >>>>> #382 ADD */
// When test ended indicate that to the receiver
if (sp->test->state == TEST_WAIT_DATA_RECEIVED) {
*sp->buffer = LAST_PACKET;
if (test->debug) {
iperf_printf(test,"iperf_sctp_send: Sending last packet\n");
}
}
/* <<<<<<< #382 ADD */

r = Nwrite(sp->socket, sp->buffer, sp->settings->blksize, Psctp);
if (r < 0)
return r;

sp->result->bytes_sent += r;
sp->result->bytes_sent_this_interval += r;
/* >>>>> #382 ADD next if <<<< */
if (sp->test->state != TEST_WAIT_DATA_RECEIVED) {
sp->result->bytes_sent += r;
sp->result->bytes_sent_this_interval += r;
}

return r;
#else
Expand Down
42 changes: 37 additions & 5 deletions src/iperf_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,27 @@ iperf_tcp_recv(struct iperf_stream *sp)

/* Only count bytes received while we're in the correct state. */
if (sp->test->state == TEST_RUNNING) {
sp->result->bytes_received += r;
sp->result->bytes_received_this_interval += r;
/* >>>>> #382 ADD */
// If sender waits for receiver to receive last packet -
// check whether test-end packet received and change state accordingly
int i = r;
if (sp->test->settings->wait_all_received > 0) {
for (i = 0; i < r; i++) {
if (sp->buffer[i] == LAST_PACKET)
break;
}
}
if (i < r) { // Last packet identified
if (sp->test->debug) {
iperf_printf(sp->test, "iperf_tcp_recv: Received last packet\n");
}
if (iperf_set_send_state(sp->test, TEST_END) != 0)
return -1;
} else {
/* <<<<<<< #382 ADD */
sp->result->bytes_received += r;
sp->result->bytes_received_this_interval += r;
}
}
else {
if (sp->test->debug)
Expand All @@ -87,14 +106,27 @@ iperf_tcp_send(struct iperf_stream *sp)

if (sp->test->zerocopy)
r = Nsendfile(sp->buffer_fd, sp->socket, sp->buffer, sp->settings->blksize);
else
else {
/* >>>>> #382 ADD */
// When test ended indicate that to the receiver (if not sending a file)
if (sp->test->state == TEST_WAIT_DATA_RECEIVED) {
*sp->buffer = LAST_PACKET;
if (sp->test->debug) {
iperf_printf(sp->test,"iperf_tcp_send: Sending last packet\n");
}
}
/* <<<<<<< #382 ADD */
r = Nwrite(sp->socket, sp->buffer, sp->settings->blksize, Ptcp);
}

if (r < 0)
return r;

sp->result->bytes_sent += r;
sp->result->bytes_sent_this_interval += r;
/* >>>>> #382 ADD next if <<<< */
if (sp->test->state != TEST_WAIT_DATA_RECEIVED) {
sp->result->bytes_sent += r;
sp->result->bytes_sent_this_interval += r;
}

if (sp->test->debug)
printf("sent %d bytes of %d, total %" PRIu64 "\n", r, sp->settings->blksize, sp->result->bytes_sent);
Expand Down
34 changes: 29 additions & 5 deletions src/iperf_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,17 @@ iperf_udp_recv(struct iperf_stream *sp)
* far (so we're expecting to see the packet with sequence number
* sp->packet_count + 1 arrive next).
*/
if (pcount >= sp->packet_count + 1) {

/* >>>>> #382 ADD and following `else` for `if` */
// If test-end packet received change state accordingly
if (pcount == TEST_END_PACKET_NUMBER) {
if (sp->test->debug) {
iperf_printf(sp->test,"iperf_udp_recv: Received last packet\n");
}
if (iperf_set_send_state(sp->test, TEST_END) != 0)
return -1;
/* <<<<<<<<< #382 ADD and following `else` for `if` */
} else if (pcount >= sp->packet_count + 1) {

/* Forward, but is there a gap in sequence numbers? */
if (pcount > sp->packet_count + 1) {
Expand Down Expand Up @@ -240,8 +250,19 @@ iperf_udp_send(struct iperf_stream *sp)

sec = htonl(before.secs);
usec = htonl(before.usecs);
pcount = htonl(sp->packet_count);

/* >>>>> #382 REPLACE next */
// pcount = htonl(sp->packet_count);
// When test ended indicate that to the receiver
if (sp->test->state == TEST_WAIT_DATA_RECEIVED) {
pcount = TEST_END_PACKET_NUMBER;
if (sp->test->debug) {
iperf_printf(sp->test,"iperf_udp_send: Sending last packet\n");
}
} else {
pcount = htonl(sp->packet_count);
}
/* <<<<<<< #382 REPLACE next */

memcpy(sp->buffer, &sec, sizeof(sec));
memcpy(sp->buffer+4, &usec, sizeof(usec));
memcpy(sp->buffer+8, &pcount, sizeof(pcount));
Expand All @@ -253,8 +274,11 @@ iperf_udp_send(struct iperf_stream *sp)
if (r < 0)
return r;

sp->result->bytes_sent += r;
sp->result->bytes_sent_this_interval += r;
/* >>>>> #382 ADD next if <<<< */
if (sp->test->state != TEST_WAIT_DATA_RECEIVED) {
sp->result->bytes_sent += r;
sp->result->bytes_sent_this_interval += r;
}

if (sp->test->debug)
printf("sent %d bytes of %d, total %" PRIu64 "\n", r, sp->settings->blksize, sp->result->bytes_sent);
Expand Down

0 comments on commit 0651353

Please sign in to comment.