Skip to content

Commit

Permalink
Publish a notification for empty keys in the GCS (ray-project#2347)
Browse files Browse the repository at this point in the history
* Publish an empty notification for empty keys

* Add failure callback to Table::Subscribe, add unit test for new behavior
  • Loading branch information
stephanie-wang authored and robertnishihara committed Jul 5, 2018
1 parent b7088c1 commit c50f196
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 31 deletions.
24 changes: 15 additions & 9 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@ void TableEntryToFlatbuf(RedisModuleKey *table_key,
fbb.CreateVector(data));
fbb.Finish(message);
} break;
case REDISMODULE_KEYTYPE_EMPTY: {
auto message = CreateGcsTableEntry(
fbb, RedisStringToFlatbuf(fbb, entry_id),
fbb.CreateVector(
std::vector<flatbuffers::Offset<flatbuffers::String>>()));
fbb.Finish(message);
} break;
default:
RAY_LOG(FATAL) << "Invalid Redis type during lookup: " << key_type;
}
Expand Down Expand Up @@ -889,15 +896,14 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
// Lookup the current value at the key.
RedisModuleKey *table_key =
OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ);
if (table_key != nullptr) {
// Publish the current value at the key to the client that is requesting
// notifications.
flatbuffers::FlatBufferBuilder fbb;
TableEntryToFlatbuf(table_key, id, fbb);
RedisModule_Call(ctx, "PUBLISH", "sb", client_channel,
reinterpret_cast<const char *>(fbb.GetBufferPointer()),
fbb.GetSize());
}
// Publish the current value at the key to the client that is requesting
// notifications. An empty notification will be published if the key is
// empty.
flatbuffers::FlatBufferBuilder fbb;
TableEntryToFlatbuf(table_key, id, fbb);
RedisModule_Call(ctx, "PUBLISH", "sb", client_channel,
reinterpret_cast<const char *>(fbb.GetBufferPointer()),
fbb.GetSize());

return RedisModule_ReplyWithNull(ctx);
}
Expand Down
46 changes: 32 additions & 14 deletions src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ void TestTableSubscribeAll(const JobID &job_id,
}
};

// The failure callback should not be called if we are subscribing to
// notifications for all keys.
auto failure_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id) {
RAY_CHECK(false);
};

// Callback for subscription success. We are guaranteed to receive
// notifications after this is called.
auto subscribe_callback = [job_id, task_id, task_specs](gcs::AsyncGcsClient *client) {
Expand All @@ -403,7 +409,8 @@ void TestTableSubscribeAll(const JobID &job_id,
// subscribed, we will write the key several times and check that we get
// notified for each.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, ClientID::nil(), notification_callback, subscribe_callback));
job_id, ClientID::nil(), notification_callback, failure_callback,
subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
test->Start();
Expand Down Expand Up @@ -451,7 +458,7 @@ void TestLogSubscribeAll(const JobID &job_id,
}
};

// Subscribe to all task table notifications. Once we have successfully
// Subscribe to all object table notifications. Once we have successfully
// subscribed, we will append to the key several times and check that we get
// notified for each.
RAY_CHECK_OK(client->object_table().Subscribe(
Expand Down Expand Up @@ -479,16 +486,10 @@ void TestTableSubscribeId(const JobID &job_id,
// Add a table entry.
TaskID task_id1 = TaskID::from_random();
std::vector<std::string> task_specs1 = {"abc", "def", "ghi"};
auto data1 = std::make_shared<protocol::TaskT>();
data1->task_specification = task_specs1[0];
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data1, nullptr));

// Add a table entry at a second key.
TaskID task_id2 = TaskID::from_random();
std::vector<std::string> task_specs2 = {"jkl", "mno", "pqr"};
auto data2 = std::make_shared<protocol::TaskT>();
data2->task_specification = task_specs2[0];
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data2, nullptr));

// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
Expand All @@ -504,6 +505,16 @@ void TestTableSubscribeId(const JobID &job_id,
}
};

// The failure callback should be called once since both keys start as empty.
bool failure_notification_received = false;
auto failure_callback = [task_id2, &failure_notification_received](
gcs::AsyncGcsClient *client, const UniqueID &id) {
ASSERT_EQ(id, task_id2);
// The failure notification should be the first notification received.
ASSERT_EQ(test->NumCallbacks(), 0);
failure_notification_received = true;
};

// The callback for subscription success. Once we've subscribed, request
// notifications for only one of the keys, then write to both keys.
auto subscribe_callback = [job_id, task_id1, task_id2, task_specs1,
Expand All @@ -513,14 +524,12 @@ void TestTableSubscribeId(const JobID &job_id,
job_id, task_id2, client->client_table().GetLocalClientId()));
// Write both keys. We should only receive notifications for the key that
// we requested them for.
auto remaining = std::vector<std::string>(++task_specs1.begin(), task_specs1.end());
for (const auto &task_spec : remaining) {
for (const auto &task_spec : task_specs1) {
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = task_spec;
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data, nullptr));
}
remaining = std::vector<std::string>(++task_specs2.begin(), task_specs2.end());
for (const auto &task_spec : remaining) {
for (const auto &task_spec : task_specs2) {
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = task_spec;
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data, nullptr));
Expand All @@ -531,10 +540,13 @@ void TestTableSubscribeId(const JobID &job_id,
// receive notifications for specific keys.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, client->client_table().GetLocalClientId(), notification_callback,
subscribe_callback));
failure_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that the failure callback was called since the key was initially
// empty.
ASSERT_TRUE(failure_notification_received);
// Check that we received one notification callback for each write to the
// requested key.
ASSERT_EQ(test->NumCallbacks(), task_specs2.size());
Expand Down Expand Up @@ -635,6 +647,12 @@ void TestTableSubscribeCancel(const JobID &job_id,
data->task_specification = task_specs[0];
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr));

// The failure callback should not be called since all keys are non-empty
// when notifications are requested.
auto failure_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id) {
RAY_CHECK(false);
};

// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [task_id, task_specs](
Expand Down Expand Up @@ -680,7 +698,7 @@ void TestTableSubscribeCancel(const JobID &job_id,
// receive notifications for specific keys.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, client->client_table().GetLocalClientId(), notification_callback,
subscribe_callback));
failure_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
Expand Down
14 changes: 11 additions & 3 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,20 @@ Status Table<ID, Data>::Lookup(const JobID &job_id, const ID &id, const Callback
template <typename ID, typename Data>
Status Table<ID, Data>::Subscribe(const JobID &job_id, const ClientID &client_id,
const Callback &subscribe,
const FailureCallback &failure,
const SubscriptionCallback &done) {
return Log<ID, Data>::Subscribe(
job_id, client_id,
[subscribe](AsyncGcsClient *client, const ID &id, const std::vector<DataT> &data) {
RAY_CHECK(data.size() == 1);
subscribe(client, id, data[0]);
[subscribe, failure](AsyncGcsClient *client, const ID &id,
const std::vector<DataT> &data) {
RAY_CHECK(data.empty() || data.size() == 1);
if (data.size() == 1) {
subscribe(client, id, data[0]);
} else {
if (failure != nullptr) {
failure(client, id);
}
}
},
done);
}
Expand Down
20 changes: 19 additions & 1 deletion src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,26 @@ class Table : private Log<ID, Data>,
Status Lookup(const JobID &job_id, const ID &id, const Callback &lookup,
const FailureCallback &failure);

/// Subscribe to any Add operations to this table. The caller may choose to
/// subscribe to all Adds, or to subscribe only to keys that it requests
/// notifications for. This may only be called once per Table instance.
///
/// \param job_id The ID of the job (= driver).
/// \param client_id The type of update to listen to. If this is nil, then a
/// message for each Add to the table will be received. Else, only
/// messages for the given client will be received. In the latter
/// case, the client may request notifications on specific keys in the
/// table via `RequestNotifications`.
/// \param subscribe Callback that is called on each received message. If the
/// callback is called with an empty vector, then there was no data at the key.
/// \param failure Callback that is called if the key is empty at the time
/// that notifications are requested.
/// \param done Callback that is called when subscription is complete and we
/// are ready to receive messages.
/// \return Status
Status Subscribe(const JobID &job_id, const ClientID &client_id,
const Callback &subscribe, const SubscriptionCallback &done);
const Callback &subscribe, const FailureCallback &failure,
const SubscriptionCallback &done);

protected:
using Log<ID, Data>::context_;
Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet/monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ void Monitor::Start() {
const HeartbeatTableDataT &heartbeat_data) {
HandleHeartbeat(id);
};
RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(UniqueID::nil(), UniqueID::nil(),
heartbeat_callback, nullptr));
RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(
UniqueID::nil(), UniqueID::nil(), heartbeat_callback, nullptr, nullptr));
Tick();
}

Expand Down
5 changes: 3 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ ray::Status NodeManager::RegisterGcs() {
};
RAY_RETURN_NOT_OK(gcs_client_->raylet_task_table().Subscribe(
JobID::nil(), gcs_client_->client_table().GetLocalClientId(),
task_committed_callback, nullptr));
task_committed_callback, nullptr, nullptr));

// Register a callback for actor creation notifications.
auto actor_creation_callback = [this](
Expand All @@ -149,7 +149,8 @@ ray::Status NodeManager::RegisterGcs() {
HeartbeatAdded(client, id, heartbeat_data);
};
RAY_RETURN_NOT_OK(gcs_client_->heartbeat_table().Subscribe(
UniqueID::nil(), UniqueID::nil(), heartbeat_added, [](gcs::AsyncGcsClient *client) {
UniqueID::nil(), UniqueID::nil(), heartbeat_added, nullptr,
[](gcs::AsyncGcsClient *client) {
RAY_LOG(DEBUG) << "heartbeat table subscription done callback called.";
}));

Expand Down

0 comments on commit c50f196

Please sign in to comment.