Skip to content

Commit

Permalink
handling partial/interrupted I/O (ray-project#69)
Browse files Browse the repository at this point in the history
* read_bytes: handle EINTR + easier flow

* addressing whitespace + style comments

* handling partial i/o: write_bytes; use [read|write]_bytes in plasma send/receive
  • Loading branch information
atumanov authored and robertnishihara committed Nov 30, 2016
1 parent a2692ea commit 1499834
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 28 deletions.
48 changes: 26 additions & 22 deletions src/common/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,24 +174,26 @@ int accept_client(int socket_fd) {
*/
int write_bytes(int fd, uint8_t *cursor, size_t length) {
ssize_t nbytes = 0;
while (length > 0) {
size_t bytesleft = length;
size_t offset = 0;
while (bytesleft > 0) {
/* While we haven't written the whole message, write to the file
* descriptor, advance the cursor, and decrease the amount left to write. */
nbytes = write(fd, cursor, length);
nbytes = write(fd, cursor + offset, bytesleft);
if (nbytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
}
/* TODO(swang): Return the error instead of exiting. */
/* Force an exit if there was any other type of error. */
CHECK(nbytes < 0);
}
if (nbytes == 0) {
return -1; /* Errno will be set. */
} else if (0 == nbytes) {
/* Encountered early EOF. */
return -1;
}
cursor += nbytes;
length -= nbytes;
CHECK(nbytes > 0);
bytesleft -= nbytes;
offset += nbytes;
}

return 0;
}

Expand Down Expand Up @@ -234,28 +236,30 @@ int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes) {
* @param fd The file descriptor to read from. It can be non-blocking.
* @param cursor The cursor pointing to the beginning of the buffer.
* @param length The size of the byte sequence to read.
* @return int Whether there was an error while writing. 0 corresponds to
* @return int Whether there was an error while reading. 0 corresponds to
* success and -1 corresponds to an error (errno will be set).
*/
int read_bytes(int fd, uint8_t *cursor, size_t length) {
ssize_t nbytes = 0;
while (length > 0) {
/* While we haven't read the whole message, read from the file descriptor,
* advance the cursor, and decrease the amount left to read. */
nbytes = read(fd, cursor, length);
/* Termination condition: EOF or read 'length' bytes total. */
size_t bytesleft = length;
size_t offset = 0;
while (bytesleft > 0) {
nbytes = read(fd, cursor + offset, bytesleft);
if (nbytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
}
/* Force an exit if there was any other type of error. */
CHECK(nbytes < 0);
}
if (nbytes == 0) {
return -1;
} else if (0 == nbytes) {
/* Encountered early EOF. */
return -1;
}
cursor += nbytes;
length -= nbytes;
CHECK(nbytes > 0);
bytesleft -= nbytes;
offset += nbytes;
}

return 0;
}

Expand Down
2 changes: 2 additions & 0 deletions src/common/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ int64_t read_buffer(int fd, int64_t *type, UT_array *buffer);
void write_log_message(int fd, char *message);
void write_formatted_log_message(int fd, const char *format, ...);
char *read_log_message(int fd);
int read_bytes(int fd, uint8_t *cursor, size_t length);
int write_bytes(int fd, uint8_t *cursor, size_t length);

#endif /* IO_H */
8 changes: 2 additions & 6 deletions src/plasma/plasma.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,11 @@ int64_t plasma_reply_size(int num_object_ids) {
int plasma_send_reply(int sock, plasma_reply *reply) {
DCHECK(reply);
int64_t reply_size = plasma_reply_size(reply->num_object_ids);
int n = write(sock, (uint8_t *) reply, reply_size);
return n == reply_size ? 0 : -1;
return write_bytes(sock, (uint8_t *) reply, reply_size);
}

int plasma_receive_reply(int sock, int64_t reply_size, plasma_reply *reply) {
int r = recv(sock, reply, reply_size, 0);
CHECKM(r != -1, "read error");
CHECKM(r != 0, "connection disconnected");
return r == reply_size ? 0 : -1;
return read_bytes(sock, (uint8_t *) reply, reply_size);
}

int plasma_send_request(int sock, int64_t type, plasma_request *request) {
Expand Down

0 comments on commit 1499834

Please sign in to comment.