Skip to content

Commit

Permalink
Fix bugs in plasma manager transfer (ray-project#1188)
Browse files Browse the repository at this point in the history
* Plasma client test for plasma abort

* Use ray-project/arrow:abort-objects branch

* Set plasma manager connection cursor to -1 when not in use

* Handle transfer errors between plasma managers, abort unsealed objects

* Add TODO for local scheduler exiting on plasma manager death

* Revert "Plasma client test for plasma abort"

This reverts commit e00fbd58dc4a632f58383549b19fb9057b305a14.

* Upgrade arrow to version with PlasmaClient::Abort

* Fix plasma manager test

* Fix plasma test

* Temporarily use arrow fork for testing

* fix and set arrow commit

* Fix plasma test

* Fix plasma manager test and make write_object_chunk consistent with read_object_chunk

* style

* upgrade arrow
  • Loading branch information
stephanie-wang authored and pcmoritz committed Nov 16, 2017
1 parent 9a7b154 commit c70430f
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 95 deletions.
3 changes: 3 additions & 0 deletions src/local_scheduler/local_scheduler_algorithm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ void fetch_missing_dependency(LocalSchedulerState *state,
auto arrow_status = state->plasma_conn->Fetch(1, &obj_id);
if (!arrow_status.ok()) {
LocalSchedulerState_free(state);
/* TODO(swang): Local scheduler should also exit even if there are no
* pending fetches. This could be done by subscribing to the db_client
* table, or pinging the plasma manager in the heartbeat handler. */
LOG_FATAL(
"Lost connection to the plasma manager, local scheduler is "
"exiting. Error: %s",
Expand Down
197 changes: 110 additions & 87 deletions src/plasma/plasma_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ ClientConnection *ClientConnection_init(PlasmaManagerState *state,
*/
void ClientConnection_free(ClientConnection *client_conn);

void ClientConnection_start_request(ClientConnection *client_conn) {
client_conn->cursor = 0;
}

void ClientConnection_finish_request(ClientConnection *client_conn) {
client_conn->cursor = -1;
}

bool ClientConnection_request_finished(ClientConnection *client_conn) {
return client_conn->cursor == -1;
}

std::unordered_map<ObjectID, std::vector<WaitRequest *>, UniqueIDHasher> &
object_wait_requests_from_type(PlasmaManagerState *manager_state, int type) {
/* We use different types of hash tables for different requests. */
Expand Down Expand Up @@ -540,27 +552,21 @@ int write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
s = RayConfig::instance().buf_size();
r = write(conn->fd, buf->data + conn->cursor, s);

if (r != s) {
LOG_ERROR("write failed, errno was %d", errno);
if (r > 0) {
LOG_ERROR("partial write on fd %d", conn->fd);
} else {
return errno;
}
int err;
if (r <= 0) {
LOG_ERROR("Write error");
err = errno;
} else {
conn->cursor += r;
CHECK(conn->cursor <= buf->data_size + buf->metadata_size);
/* If we've finished writing this buffer, reset the cursor. */
if (conn->cursor == buf->data_size + buf->metadata_size) {
LOG_DEBUG("writing on channel %d finished", conn->fd);
ClientConnection_finish_request(conn);
}
err = 0;
}
if (r == 0) {
/* If we've finished writing this buffer, reset the cursor to zero. */
LOG_DEBUG("writing on channel %d finished", conn->fd);
conn->cursor = 0;
/* We are done sending the object, so release it. The corresponding call to
* plasma_get occurred in process_transfer_request. */
ARROW_CHECK_OK(conn->manager_state->plasma_conn->Release(
buf->object_id.to_plasma_id()));
}

return 0;
return err;
}

void send_queued_request(event_loop *loop,
Expand Down Expand Up @@ -589,13 +595,14 @@ void send_queued_request(event_loop *loop,
break;
case MessageType_PlasmaDataReply:
LOG_DEBUG("Transferring object to manager");
if (conn->cursor == 0) {
/* If the cursor is zero, we haven't sent any requests for this object
if (ClientConnection_request_finished(conn)) {
/* If the cursor is not set, we haven't sent any requests for this object
* yet, so send the initial data request. */
err = handle_sigpipe(
plasma::SendDataReply(conn->fd, buf->object_id.to_plasma_id(),
buf->data_size, buf->metadata_size),
conn->fd);
ClientConnection_start_request(conn);
}
if (err == 0) {
err = write_object_chunk(conn, buf);
Expand All @@ -605,24 +612,25 @@ void send_queued_request(event_loop *loop,
LOG_FATAL("Buffered request has unknown type.");
}

/* If there was a SIGPIPE, stop sending to this manager. */
/* If the other side hung up, stop sending to this manager. */
if (err != 0) {
/* If there was an ECONNRESET, this means that we haven't finished
* connecting to this manager yet. Resend the request when the socket is
* ready for a write again. */
if (err == ECONNRESET) {
return;
if (buf->type == MessageType_PlasmaDataReply) {
/* We errored while sending the object, so release it before removing the
* connection. The corresponding call to plasma_get occurred in
* process_transfer_request. */
ARROW_CHECK_OK(conn->manager_state->plasma_conn->Release(
buf->object_id.to_plasma_id()));
}
event_loop_remove_file(loop, conn->fd);
ClientConnection_free(conn);
return;
}

/* If we are done sending this request, remove it from the transfer queue. */
if (conn->cursor == 0) {
} else if (ClientConnection_request_finished(conn)) {
/* If we are done with this request, remove it from the transfer queue. */
if (buf->type == MessageType_PlasmaDataReply) {
/* If we just finished sending an object to a remote manager, then remove
* the object from the hash table of pending transfer requests. */
/* We are done sending the object, so release it. The corresponding call
* to plasma_get occurred in process_transfer_request. */
ARROW_CHECK_OK(conn->manager_state->plasma_conn->Release(
buf->object_id.to_plasma_id()));
/* Remove the object from the hash table of pending transfer requests. */
conn->pending_object_transfers.erase(buf->object_id);
}
conn->transfer_queue.pop_front();
Expand All @@ -642,21 +650,21 @@ int read_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
}
r = read(conn->fd, buf->data + conn->cursor, s);

if (r == -1) {
LOG_ERROR("read error");
} else if (r == 0) {
LOG_DEBUG("end of file");
int err;
if (r <= 0) {
LOG_ERROR("Read error");
err = errno;
} else {
conn->cursor += r;
CHECK(conn->cursor <= buf->data_size + buf->metadata_size);
/* If the cursor is equal to the full object size, reset the cursor and
* we're done. */
if (conn->cursor == buf->data_size + buf->metadata_size) {
ClientConnection_finish_request(conn);
}
err = 0;
}
/* If the cursor is equal to the full object size, reset the cursor and we're
* done. */
if (conn->cursor == buf->data_size + buf->metadata_size) {
conn->cursor = 0;
return 1;
} else {
return 0;
}
return err;
}

void process_data_chunk(event_loop *loop,
Expand All @@ -666,27 +674,37 @@ void process_data_chunk(event_loop *loop,
/* Read the object chunk. */
ClientConnection *conn = (ClientConnection *) context;
PlasmaRequestBuffer *buf = conn->transfer_queue.front();
int done = read_object_chunk(conn, buf);
if (!done) {
return;
int err = read_object_chunk(conn, buf);
auto plasma_conn = conn->manager_state->plasma_conn;
if (err != 0) {
/* Abort the object that we were trying to read from the remote plasma
* manager. */
ARROW_CHECK_OK(plasma_conn->Release(buf->object_id.to_plasma_id()));
ARROW_CHECK_OK(plasma_conn->Abort(buf->object_id.to_plasma_id()));
/* Remove the bad connection. */
event_loop_remove_file(loop, data_sock);
ClientConnection_free(conn);
} else if (ClientConnection_request_finished(conn)) {
/* If we're done receiving the object, seal the object and release it. The
* release corresponds to the call to plasma_create that occurred in
* process_data_request. */
LOG_DEBUG("reading on channel %d finished", data_sock);
/* The following seal also triggers notification of clients for fetch or
* wait requests, see process_object_notification. */
ARROW_CHECK_OK(plasma_conn->Seal(buf->object_id.to_plasma_id()));
ARROW_CHECK_OK(plasma_conn->Release(buf->object_id.to_plasma_id()));
/* Remove the request buffer used for reading this object's data. */
conn->transfer_queue.pop_front();
delete buf;
/* Switch to listening for requests from this socket, instead of reading
* object data. */
event_loop_remove_file(loop, data_sock);
bool success = event_loop_add_file(loop, data_sock, EVENT_LOOP_READ,
process_message, conn);
if (!success) {
ClientConnection_free(conn);
}
}

/* Seal the object and release it. The release corresponds to the call to
* plasma_create that occurred in process_data_request. */
LOG_DEBUG("reading on channel %d finished", data_sock);
/* The following seal also triggers notification of clients for fetch or
* wait requests, see process_object_notification. */
ARROW_CHECK_OK(
conn->manager_state->plasma_conn->Seal(buf->object_id.to_plasma_id()));
ARROW_CHECK_OK(
conn->manager_state->plasma_conn->Release(buf->object_id.to_plasma_id()));
/* Remove the request buffer used for reading this object's data. */
conn->transfer_queue.pop_front();
delete buf;
/* Switch to listening for requests from this socket, instead of reading
* object data. */
event_loop_remove_file(loop, data_sock);
event_loop_add_file(loop, data_sock, EVENT_LOOP_READ, process_message, conn);
}

void ignore_data_chunk(event_loop *loop,
Expand All @@ -698,17 +716,22 @@ void ignore_data_chunk(event_loop *loop,
PlasmaRequestBuffer *buf = conn->ignore_buffer;

/* Just read the transferred data into ignore_buf and then drop (free) it. */
int done = read_object_chunk(conn, buf);
if (!done) {
return;
int err = read_object_chunk(conn, buf);
if (err != 0) {
event_loop_remove_file(loop, data_sock);
ClientConnection_free(conn);
} else if (ClientConnection_request_finished(conn)) {
free(buf->data);
delete buf;
/* Switch to listening for requests from this socket, instead of reading
* object data. */
event_loop_remove_file(loop, data_sock);
bool success = event_loop_add_file(loop, data_sock, EVENT_LOOP_READ,
process_message, conn);
if (!success) {
ClientConnection_free(conn);
}
}

free(buf->data);
delete buf;
/* Switch to listening for requests from this socket, instead of reading
* object data. */
event_loop_remove_file(loop, data_sock);
event_loop_add_file(loop, data_sock, EVENT_LOOP_READ, process_message, conn);
}

ClientConnection *get_manager_connection(PlasmaManagerState *state,
Expand Down Expand Up @@ -829,29 +852,29 @@ void process_data_request(event_loop *loop,
* conn->transfer_queue. */
conn->transfer_queue.push_back(buf);
}
CHECK(conn->cursor == 0);
CHECK(ClientConnection_request_finished(conn));
ClientConnection_start_request(conn);

/* Switch to reading the data from this socket, instead of listening for
* other requests. */
event_loop_remove_file(loop, client_sock);
event_loop_file_handler data_chunk_handler;
if (s.ok()) {
bool success = event_loop_add_file(loop, client_sock, EVENT_LOOP_READ,
process_data_chunk, conn);
if (!success) {
ClientConnection_free(conn);
}
data_chunk_handler = process_data_chunk;
} else {
/* Since plasma_create() has failed, we ignore the data transfer. We will
* receive this transfer in g_ignore_buf and then drop it. Allocate memory
* for data and metadata, if needed. All memory associated with
* buf/g_ignore_buf will be freed in ignore_data_chunkc(). */
conn->ignore_buffer = buf;
buf->data = (uint8_t *) malloc(buf->data_size + buf->metadata_size);
bool success = event_loop_add_file(loop, client_sock, EVENT_LOOP_READ,
ignore_data_chunk, conn);
if (!success) {
ClientConnection_free(conn);
}
data_chunk_handler = ignore_data_chunk;
}

bool success = event_loop_add_file(loop, client_sock, EVENT_LOOP_READ,
data_chunk_handler, conn);
if (!success) {
ClientConnection_free(conn);
}
}

Expand Down Expand Up @@ -1328,7 +1351,7 @@ ClientConnection *ClientConnection_init(PlasmaManagerState *state,
/* Create a new data connection context per client. */
ClientConnection *conn = new ClientConnection();
conn->manager_state = state;
conn->cursor = 0;
ClientConnection_finish_request(conn);
conn->fd = client_sock;
conn->num_return_objects = 0;

Expand Down
36 changes: 31 additions & 5 deletions src/plasma/plasma_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,24 +203,50 @@ ClientConnection *get_manager_connection(PlasmaManagerState *state,
* Reads an object chunk sent by the given client into a buffer. This is the
* complement to write_object_chunk.
*
* @param conn The connection to the client who's sending the data.
* @param conn The connection to the client who's sending the data. The
* connection's cursor will be reset if this is the last read for the
* current object.
* @param buf The buffer to write the data into.
* @return An integer representing whether the client is done sending this
* object. 1 means that the client has sent all the data, 0 means there
* is more.
* @return The errno set, if the read wasn't successful.
*/
int read_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf);

/**
* Writes an object chunk from a buffer to the given client. This is the
* complement to read_object_chunk.
*
* @param conn The connection to the client who's receiving the data.
* @param conn The connection to the client who's receiving the data. The
* connection's cursor will be reset if this is the last write for the
* current object.
* @param buf The buffer to read data from.
* @return The errno set, if the write wasn't successful.
*/
int write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf);

/**
* Start a new request on this connection.
*
* @param conn The connection on which the request is being sent.
* @return Void.
*/
void ClientConnection_start_request(ClientConnection *client_conn);

/**
* Finish the current request on this connection.
*
* @param conn The connection on which the request is being sent.
* @return Void.
*/
void ClientConnection_finish_request(ClientConnection *client_conn);

/**
* Check whether the current request on this connection is finished.
*
* @param conn The connection on which the request is being sent.
* @return Whether the request has finished.
*/
bool ClientConnection_request_finished(ClientConnection *client_conn);

/**
* Get the event loop of the given plasma manager state.
*
Expand Down
8 changes: 6 additions & 2 deletions src/plasma/test/manager_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,16 @@ TEST read_write_object_chunk_test(void) {
* - Read the object data on the local manager.
* - Check that the data matches.
*/
ClientConnection_start_request(remote_mock->write_conn);
write_object_chunk(remote_mock->write_conn, &remote_buf);
ASSERT(ClientConnection_request_finished(remote_mock->write_conn));
/* Wait until the data is ready to be read. */
wait_for_pollin(get_client_sock(remote_mock->read_conn));
/* Read the data. */
int done = read_object_chunk(remote_mock->read_conn, &local_buf);
ASSERT(done);
ClientConnection_start_request(remote_mock->read_conn);
int err = read_object_chunk(remote_mock->read_conn, &local_buf);
ASSERT_EQ(err, 0);
ASSERT(ClientConnection_request_finished(remote_mock->read_conn));
ASSERT_EQ(memcmp(remote_buf.data, local_buf.data, data_size), 0);
/* Clean up. */
free(local_buf.data);
Expand Down
2 changes: 1 addition & 1 deletion src/thirdparty/download_thirdparty.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ fi
cd $TP_DIR/arrow
git fetch origin master

git checkout 05788d035f4aa918d80c9db7a1bf74fe38309c60
git checkout 837150e245823c6f0cd9e16dba89b6d1a0396aa7

0 comments on commit c70430f

Please sign in to comment.