Skip to content

Commit

Permalink
Add an Append call to the GCS Log that checks for current length (ray…
Browse files Browse the repository at this point in the history
…-project#1788)

* TABLE_APPEND call

* Convert callbacks back to taking in a string...

* GCS returns flatbuffers, define Log class

* Cleanups

* Modify client table to use the Log interface

* Fix bug where we replied twice from redis

* Fixes

* lint

* Compile and test raylet TaskTable

* Modify GCS tables to handle unique_ptrs from nested flatbuffers

* Add raylet::TaskTable unit tests to replace ObjectTable ones

* Convert ObjectTable to a log

* Convert ObjectTable tests to the Log

* AppendAt Redis and gcs Log command

* unit test for AppendAt

* Add a Log for task reconstruction data

* Add check for unique entries in TABLE_APPEND

* Documentation
  • Loading branch information
stephanie-wang authored and pcmoritz committed Mar 27, 2018
1 parent 8d52fe9 commit 925e392
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 20 deletions.
71 changes: 61 additions & 10 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -606,32 +606,83 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx,
}
}

/// Append an entry to the log stored at a key. Publishes a notification about
/// the update to all subscribers, if a pubsub channel is provided.
///
/// This is called from a client with the command:
//
/// RAY.TABLE_APPEND <table_prefix> <pubsub_channel> <id> <data>
/// <index (optional)>
///
/// \param table_prefix The prefix string for keys in this table.
/// \param pubsub_channel The pubsub channel name that notifications for
/// this key should be published to. When publishing to a specific
/// client, the channel name should be <pubsub_channel>:<client_id>.
/// \param id The ID of the key to append to.
/// \param data The data to append to the key.
/// \param index If this is set, then the data must be appended at this index.
/// If the current log is shorter or longer than the requested index,
/// then the append will fail and an error message will be returned as a
/// string.
/// \return OK if the append succeeds, or an error message string if the append
/// fails.
int TableAppend_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc != 5) {
if (argc < 5 || argc > 6) {
return RedisModule_WrongArity(ctx);
}

RedisModuleString *prefix_str = argv[1];
RedisModuleString *pubsub_channel_str = argv[2];
RedisModuleString *id = argv[3];
RedisModuleString *data = argv[4];
RedisModuleString *index_str = nullptr;
if (argc == 6) {
index_str = argv[5];
}

// Set the keys in the table.
RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id,
REDISMODULE_READ | REDISMODULE_WRITE);
// Determine the index at which the data should be appended. If no index is
// requested, then is the current length of the log.
size_t index = RedisModule_ValueLength(key);
RedisModule_ZsetAdd(key, index, data, NULL);
RedisModule_CloseKey(key);

// Publish a message on the requested pubsub channel if necessary.
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
if (pubsub_channel != TablePubsub_NO_PUBLISH) {
// All other pubsub channels write the data back directly onto the channel.
return PublishTableAdd(ctx, pubsub_channel_str, id, data);
if (index_str != nullptr) {
// Parse the requested index.
long long requested_index;
RAY_CHECK(RedisModule_StringToLongLong(index_str, &requested_index) ==
REDISMODULE_OK);
RAY_CHECK(requested_index >= 0);
index = static_cast<size_t>(requested_index);
}
// Only perform the append if the requested index matches the current length
// of the log, or if no index was requested.
if (index == RedisModule_ValueLength(key)) {
// The requested index matches the current length of the log or no index
// was requested. Perform the append.
int flags = REDISMODULE_ZADD_NX;
RedisModule_ZsetAdd(key, index, data, &flags);
// Check that we actually add a new entry during the append. This is only
// necessary since we implement the log with a sorted set, so all entries
// must be unique, or else we will have gaps in the log.
RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry";
RedisModule_CloseKey(key);
// Publish a message on the requested pubsub channel if necessary.
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
if (pubsub_channel != TablePubsub_NO_PUBLISH) {
// All other pubsub channels write the data back directly onto the
// channel.
return PublishTableAdd(ctx, pubsub_channel_str, id, data);
} else {
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
} else {
return RedisModule_ReplyWithSimpleString(ctx, "OK");
// The requested index did not match the current length of the log. Return
// an error message as a string.
RedisModule_CloseKey(key);
const char *reply = "ERR entry exists";
return RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply));
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Status AsyncGcsClient::Connect(const std::string &address, int port,
object_table_.reset(new ObjectTable(context_, this));
task_table_.reset(new TaskTable(context_, this));
raylet_task_table_.reset(new raylet::TaskTable(context_, this));
task_reconstruction_log_.reset(new TaskReconstructionLog(context_, this));
client_table_.reset(new ClientTable(context_, this, client_info));
// TODO(swang): Call the client table's Connect() method here. To do this,
// we need to make sure that we are attached to an event loop first. This
Expand Down Expand Up @@ -44,6 +45,10 @@ TaskTable &AsyncGcsClient::task_table() { return *task_table_; }

raylet::TaskTable &AsyncGcsClient::raylet_task_table() { return *raylet_task_table_; }

TaskReconstructionLog &AsyncGcsClient::task_reconstruction_log() {
return *task_reconstruction_log_;
}

ClientTable &AsyncGcsClient::client_table() { return *client_table_; }

FunctionTable &AsyncGcsClient::function_table() { return *function_table_; }
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class RAY_EXPORT AsyncGcsClient {
ObjectTable &object_table();
TaskTable &task_table();
raylet::TaskTable &raylet_task_table();
TaskReconstructionLog &task_reconstruction_log();
ClientTable &client_table();
inline ErrorTable &error_table();

Expand All @@ -65,6 +66,7 @@ class RAY_EXPORT AsyncGcsClient {
std::unique_ptr<ObjectTable> object_table_;
std::unique_ptr<TaskTable> task_table_;
std::unique_ptr<raylet::TaskTable> raylet_task_table_;
std::unique_ptr<TaskReconstructionLog> task_reconstruction_log_;
std::unique_ptr<ClientTable> client_table_;
std::shared_ptr<RedisContext> context_;
std::unique_ptr<RedisAsioClient> asio_async_client_;
Expand Down
55 changes: 55 additions & 0 deletions src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,61 @@ TEST_F(TestGcsWithAsio, TestTableLookupFailure) {
TestTableLookupFailure(job_id_, client_);
}

void TestLogAppendAt(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
TaskID task_id = TaskID::from_random();
std::vector<std::string> managers = {"A", "B"};
std::vector<std::shared_ptr<TaskReconstructionDataT>> data_log;
for (const auto &manager : managers) {
auto data = std::make_shared<TaskReconstructionDataT>();
data->node_manager_id = manager;
data_log.push_back(data);
}

// Check that we added the correct task.
auto failure_callback = [task_id](gcs::AsyncGcsClient *client, const UniqueID &id,
const std::shared_ptr<TaskReconstructionDataT> d) {
ASSERT_EQ(id, task_id);
test->IncrementNumCallbacks();
};

RAY_CHECK_OK(client->task_reconstruction_log().Append(job_id, task_id, data_log.front(),
nullptr));
RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(job_id, task_id, data_log[1],
nullptr, failure_callback, 0));
RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(job_id, task_id, data_log[1],
nullptr, failure_callback, 2));
RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(job_id, task_id, data_log[1],
nullptr, failure_callback, 1));

auto lookup_callback = [task_id, managers](
gcs::AsyncGcsClient *client, const UniqueID &id,
const std::vector<TaskReconstructionDataT> &data) {
std::vector<std::string> appended_managers;
for (const auto &entry : data) {
appended_managers.push_back(entry.node_manager_id);
}
ASSERT_EQ(appended_managers, managers);
test->Stop();
};
RAY_CHECK_OK(
client->task_reconstruction_log().Lookup(job_id, task_id, lookup_callback));
// Run the event loop. The loop will only stop if the Lookup callback is
// called (or an assertion failure).
test->Start();
ASSERT_EQ(test->NumCallbacks(), 2);
}

TEST_F(TestGcsWithAe, TestLogAppendAt) {
test = this;
TestLogAppendAt(job_id_, client_);
}

TEST_F(TestGcsWithAsio, TestLogAppendAt) {
test = this;
TestLogAppendAt(job_id_, client_);
}

// Task table callbacks.
void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id,
const std::shared_ptr<TaskTableDataT> data) {
ASSERT_EQ(data->scheduling_state, SchedulingState_SCHEDULED);
Expand Down
13 changes: 12 additions & 1 deletion src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ enum TablePrefix:int {
RAYLET_TASK,
CLIENT,
OBJECT,
FUNCTION
FUNCTION,
TASK_RECONSTRUCTION
}

// The channel that Add operations to the Table should be published on, if any.
Expand All @@ -35,9 +36,19 @@ table FunctionTableData {
}

table ObjectTableData {
// The size of the object.
object_size: long;
// The node manager ID that this object appeared on or was evicted by.
manager: string;
// Whether this entry is an addition or a deletion.
is_eviction: bool;
// The number of times this object has been evicted from this node so far.
num_evictions: int;
}

table TaskReconstructionData {
num_executions: int;
node_manager_id: string;
}

enum SchedulingState:int {
Expand Down
28 changes: 20 additions & 8 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,29 @@ Status RedisContext::AttachToEventLoop(aeEventLoop *loop) {
Status RedisContext::RunAsync(const std::string &command, const UniqueID &id,
const uint8_t *data, int64_t length,
const TablePrefix prefix, const TablePubsub pubsub_channel,
int64_t callback_index) {
int64_t callback_index, int log_length) {
if (length > 0) {
std::string redis_command = command + " %d %d %b %b";
int status = redisAsyncCommand(
async_context_, reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
reinterpret_cast<void *>(callback_index), redis_command.c_str(), prefix,
pubsub_channel, id.data(), id.size(), data, length);
if (status == REDIS_ERR) {
return Status::RedisError(std::string(async_context_->errstr));
if (log_length >= 0) {
std::string redis_command = command + " %d %d %b %b %d";
int status = redisAsyncCommand(
async_context_, reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
reinterpret_cast<void *>(callback_index), redis_command.c_str(), prefix,
pubsub_channel, id.data(), id.size(), data, length, log_length);
if (status == REDIS_ERR) {
return Status::RedisError(std::string(async_context_->errstr));
}
} else {
std::string redis_command = command + " %d %d %b %b";
int status = redisAsyncCommand(
async_context_, reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
reinterpret_cast<void *>(callback_index), redis_command.c_str(), prefix,
pubsub_channel, id.data(), id.size(), data, length);
if (status == REDIS_ERR) {
return Status::RedisError(std::string(async_context_->errstr));
}
}
} else {
RAY_CHECK(log_length == -1);
std::string redis_command = command + " %d %d %b";
int status = redisAsyncCommand(
async_context_, reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
Expand Down
18 changes: 17 additions & 1 deletion src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,25 @@ class RedisContext {
~RedisContext();
Status Connect(const std::string &address, int port);
Status AttachToEventLoop(aeEventLoop *loop);

/// Run an operation on some table key.
///
/// \param command The command to run. This must match a registered Ray Redis
/// command. These are strings of the format "RAY.TABLE_*".
/// \param id The table key to run the operation at.
/// \param data The data to add to the table key, if any.
/// \param length The length of the data to be added, if data is provided.
/// \param prefix
/// \param pubsub_channel
/// \param callback_index
/// \param log_length The RAY.TABLE_APPEND command takes in an optional index
/// at which the data must be appended. For all other commands, set to
/// -1 for unused. If set, then data must be provided.
Status RunAsync(const std::string &command, const UniqueID &id, const uint8_t *data,
int64_t length, const TablePrefix prefix,
const TablePubsub pubsub_channel, int64_t callback_index);
const TablePubsub pubsub_channel, int64_t callback_index,
int log_length = -1);

Status SubscribeAsync(const ClientID &client_id, const TablePubsub pubsub_channel,
int64_t callback_index);
redisAsyncContext *async_context() { return async_context_; }
Expand Down
28 changes: 28 additions & 0 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
new CallbackData({id, data, nullptr, nullptr, this, client_}));
int64_t callback_index =
RedisCallbackManager::instance().add([d, done](const std::string &data) {
RAY_CHECK(data.empty());
if (done != nullptr) {
(done)(d->client, d->id, d->data);
}
Expand All @@ -26,6 +27,32 @@ Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
prefix_, pubsub_channel_, callback_index);
}

template <typename ID, typename Data>
Status Log<ID, Data>::AppendAt(const JobID &job_id, const ID &id,
std::shared_ptr<DataT> data, const WriteCallback &done,
const WriteCallback &failure, int log_length) {
auto d = std::shared_ptr<CallbackData>(
new CallbackData({id, data, nullptr, nullptr, this, client_}));
int64_t callback_index =
RedisCallbackManager::instance().add([d, done, failure](const std::string &data) {
if (data.empty()) {
if (done != nullptr) {
(done)(d->client, d->id, d->data);
}
} else {
if (failure != nullptr) {
(failure)(d->client, d->id, d->data);
}
}
return true;
});
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
fbb.Finish(Data::Pack(fbb, data.get()));
return context_->RunAsync("RAY.TABLE_APPEND", id, fbb.GetBufferPointer(), fbb.GetSize(),
prefix_, pubsub_channel_, callback_index, log_length);
}

template <typename ID, typename Data>
Status Log<ID, Data>::Lookup(const JobID &job_id, const ID &id, const Callback &lookup) {
auto d = std::shared_ptr<CallbackData>(
Expand Down Expand Up @@ -308,6 +335,7 @@ template class Log<ObjectID, ObjectTableData>;
template class Log<TaskID, ray::protocol::Task>;
template class Table<TaskID, ray::protocol::Task>;
template class Table<TaskID, TaskTableData>;
template class Log<TaskID, TaskReconstructionData>;

} // namespace gcs

Expand Down
26 changes: 26 additions & 0 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ class Log {
Status Append(const JobID &job_id, const ID &id, std::shared_ptr<DataT> data,
const WriteCallback &done);

/// Append a log entry to a key if and only if the log has the given number
/// of entries.
///
/// \param job_id The ID of the job (= driver).
/// \param id The ID of the data that is added to the GCS.
/// \param data Data to append to the log.
/// \param done Callback that is called if the data was appended to the log.
/// \param failure Callback that is called if the data was not appended to
/// the log because the log length did not match the given
/// `log_length`.
/// \param log_length The number of entries that the log must have for the
/// append to succeed.
/// \return Status
Status AppendAt(const JobID &job_id, const ID &id, std::shared_ptr<DataT> data,
const WriteCallback &done, const WriteCallback &failure,
int log_length);

/// Lookup the log values at a key asynchronously.
///
/// \param job_id The ID of the job (= driver).
Expand Down Expand Up @@ -241,6 +258,15 @@ using ClassTable = Table<ClassID, ClassTableData>;
// TODO(swang): Set the pubsub channel for the actor table.
using ActorTable = Table<ActorID, ActorTableData>;

class TaskReconstructionLog : public Log<TaskID, TaskReconstructionData> {
public:
TaskReconstructionLog(const std::shared_ptr<RedisContext> &context,
AsyncGcsClient *client)
: Log(context, client) {
prefix_ = TablePrefix_TASK_RECONSTRUCTION;
}
};

namespace raylet {

class TaskTable : public Table<TaskID, ray::protocol::Task> {
Expand Down

0 comments on commit 925e392

Please sign in to comment.