From a5c8f28f33144e22bf69187c67053fc47f4f948b Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 25 Jan 2017 22:57:15 -0800 Subject: [PATCH] Plasma subscribe (#227) * Use object_info as notification, not just the object_id * Add a regression test for plasma managers connecting to store after some objects have been created * Send notifications for existing objects to new plasma subscribers * Continuously try the request to the plasma manager instead of setting a timeout in the test case * Use ray.services to start Redis in plasma test cases * fix test case --- python/plasma/test/test.py | 91 ++++++++++++++++++++++++++++++-------- python/ray/services.py | 2 +- src/plasma/plasma_store.c | 61 ++++++++++++------------- 3 files changed, 104 insertions(+), 50 deletions(-) diff --git a/python/plasma/test/test.py b/python/plasma/test/test.py index 1a9101a5a5e7..96354f12591d 100644 --- a/python/plasma/test/test.py +++ b/python/plasma/test/test.py @@ -17,6 +17,7 @@ import plasma from plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object +from ray import services USE_VALGRIND = False PLASMA_STORE_MEMORY = 1000000000 @@ -492,19 +493,8 @@ def setUp(self): store_name1, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) store_name2, self.p3 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) # Start a Redis server. - redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../core/src/common/thirdparty/redis/src/redis-server") - redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../core/src/common/redis_module/libray_redis_module.so") - assert os.path.isfile(redis_path) - assert os.path.isfile(redis_module) - redis_port = 6379 - with open(os.devnull, "w") as FNULL: - self.redis_process = subprocess.Popen([redis_path, - "--port", str(redis_port), - "--loadmodule", redis_module], - stdout=FNULL) - time.sleep(0.1) + redis_address = services.start_redis("127.0.0.1") # Start two PlasmaManagers. - redis_address = "{}:{}".format("127.0.0.1", redis_port) manager_name1, self.p4, self.port1 = plasma.start_plasma_manager(store_name1, redis_address, use_valgrind=USE_VALGRIND) manager_name2, self.p5, self.port2 = plasma.start_plasma_manager(store_name2, redis_address, use_valgrind=USE_VALGRIND) # Connect two PlasmaClients. @@ -533,12 +523,11 @@ def tearDown(self): else: for process in self.processes_to_kill: process.kill() - self.redis_process.kill() + + # Clean up the Redis server. + services.cleanup() def test_fetch(self): - if self.redis_process is None: - print("Cannot test fetch without a running redis instance.") - self.assertTrue(False) for _ in range(10): # Create an object. object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000) @@ -582,9 +571,6 @@ def test_fetch(self): memory_buffer=memory_buffer3, metadata=metadata3) def test_fetch_multiple(self): - if self.redis_process is None: - print("Cannot test fetch without a running redis instance.") - self.assertTrue(False) for _ in range(20): # Create two objects and a third fake one that doesn't exist. object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000) @@ -797,6 +783,73 @@ def test_stresstest(self): print("it took", b, "seconds to put and transfer the objects") +class TestPlasmaManagerRecovery(unittest.TestCase): + + def setUp(self): + # Start a Plasma store. + self.store_name, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) + # Start a Redis server. + self.redis_address = services.start_redis("127.0.0.1") + # Start a PlasmaManagers. + manager_name, self.p3, self.port1 = plasma.start_plasma_manager( + self.store_name, + self.redis_address, + use_valgrind=USE_VALGRIND) + # Connect a PlasmaClient. + self.client = plasma.PlasmaClient(self.store_name, manager_name) + + # Store the processes that will be explicitly killed during tearDown so + # that a test case can remove ones that will be killed during the test. + self.processes_to_kill = [self.p2, self.p3] + + def tearDown(self): + # Check that the processes are still alive. + for process in self.processes_to_kill: + self.assertEqual(process.poll(), None) + + # Kill the Plasma store and Plasma manager processes. + if USE_VALGRIND: + time.sleep(1) # give processes opportunity to finish work + for process in self.processes_to_kill: + process.send_signal(signal.SIGTERM) + process.wait() + if process.returncode != 0: + print("aborting due to valgrind error") + os._exit(-1) + else: + for process in self.processes_to_kill: + process.kill() + + # Clean up the Redis server. + services.cleanup() + + def test_delayed_start(self): + num_objects = 10 + # Create some objects using one client. + object_ids = [random_object_id() for _ in range(num_objects)] + for i in range(10): + create_object_with_id(self.client, object_ids[i], 2000, 2000) + + # Wait until the objects have been sealed in the store. + ready, waiting = self.client.wait(object_ids, num_returns=num_objects) + self.assertEqual(set(ready), set(object_ids)) + self.assertEqual(waiting, []) + + # Start a second plasma manager attached to the same store. + manager_name, self.p5, self.port2 = plasma.start_plasma_manager(self.store_name, self.redis_address, use_valgrind=USE_VALGRIND) + self.processes_to_kill.append(self.p5) + + # Check that the second manager knows about existing objects. + client2 = plasma.PlasmaClient(self.store_name, manager_name) + ready, waiting = [], object_ids + while True: + ready, waiting = client2.wait(object_ids, num_returns=num_objects, timeout=0) + if len(ready) == len(object_ids): + break + + self.assertEqual(set(ready), set(object_ids)) + self.assertEqual(waiting, []) + if __name__ == "__main__": if len(sys.argv) > 1: # pop the argument so we don't mess with unittest's own argument parser diff --git a/python/ray/services.py b/python/ray/services.py index bb52f4d7c6c0..d7edde005529 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -190,7 +190,7 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F /dev/null. Returns: - The port used by Redis. + The address used by Redis. Raises: Exception: An exception is raised if Redis could not be started. diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index 1c946afbc5ac..a80bb8b3d12c 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -52,12 +52,16 @@ struct client { * object_table_entry type. */ UT_icd client_icd = {sizeof(client *), NULL, NULL, NULL}; +/* This is used to define the queue of object notifications for plasma + * subscribers. */ +UT_icd object_info_icd = {sizeof(object_info), NULL, NULL, NULL}; + typedef struct { /** Client file descriptor. This is used as a key for the hash table. */ int subscriber_fd; - /** The object IDs to notify the client about. We notify the client about the - * IDs in the order that the objects were sealed. */ - UT_array *object_ids; + /** The object notifications for clients. We notify the client about the + * objects in the order that the objects were sealed or deleted. */ + UT_array *object_notifications; /** Handle for the uthash table. */ UT_hash_handle hh; } notification_queue; @@ -136,7 +140,8 @@ plasma_store_state *init_plasma_store(event_loop *loop, int64_t system_memory) { return state; } -void push_notification(plasma_store_state *state, object_id object_id); +void push_notification(plasma_store_state *state, + object_info *object_notification); /* If this client is not already using the object, add the client to the * object's list of clients, otherwise do nothing. */ @@ -562,7 +567,7 @@ void seal_object(client *client_context, /* Set the object digest. */ memcpy(entry->info.digest, digest, DIGEST_SIZE); /* Inform all subscribers that a new object has been sealed. */ - push_notification(plasma_state, object_id); + push_notification(plasma_state, &entry->info); /* Update all get requests that involve this object. */ update_object_get_requests(plasma_state, object_id); @@ -589,7 +594,8 @@ void delete_object(plasma_store_state *plasma_state, object_id object_id) { utarray_free(entry->clients); free(entry); /* Inform all subscribers that the object has been deleted. */ - push_notification(plasma_state, object_id); + object_info notification = {.obj_id = object_id, .is_deletion = true}; + push_notification(plasma_state, ¬ification); } void remove_objects(plasma_store_state *plasma_state, @@ -605,10 +611,11 @@ void remove_objects(plasma_store_state *plasma_state, } } -void push_notification(plasma_store_state *plasma_state, object_id object_id) { +void push_notification(plasma_store_state *plasma_state, + object_info *notification) { notification_queue *queue, *temp_queue; HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) { - utarray_push_back(queue->object_ids, &object_id); + utarray_push_back(queue->object_notifications, notification); send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, 0); } @@ -627,26 +634,13 @@ void send_notifications(event_loop *loop, int num_processed = 0; /* Loop over the array of pending notifications and send as many of them as * possible. */ - for (int i = 0; i < utarray_len(queue->object_ids); ++i) { - object_id *obj_id = (object_id *) utarray_eltptr(queue->object_ids, i); - object_table_entry *entry = NULL; - /* This object should already exist in plasma store state. */ - HASH_FIND(handle, plasma_state->plasma_store_info->objects, obj_id, - sizeof(object_id), entry); - - object_info object_info; - if (entry == NULL) { - memset(&object_info, 0, sizeof(object_info)); - object_info.obj_id = *obj_id; - object_info.is_deletion = true; - } else { - object_info = entry->info; - object_info.is_deletion = false; - } + for (int i = 0; i < utarray_len(queue->object_notifications); ++i) { + object_info *notification = + (object_info *) utarray_eltptr(queue->object_notifications, i); /* Attempt to send a notification about this object ID. */ int nbytes = - send(client_sock, (char const *) &object_info, sizeof(object_info), 0); + send(client_sock, (char const *) notification, sizeof(object_info), 0); if (nbytes >= 0) { CHECK(nbytes == sizeof(object_info)); } else if (nbytes == -1 && @@ -667,9 +661,9 @@ void send_notifications(event_loop *loop, num_processed += 1; } /* Remove the sent notifications from the array. */ - utarray_erase(queue->object_ids, 0, num_processed); + utarray_erase(queue->object_notifications, 0, num_processed); /* If we have sent all notifications, remove the fd from the event loop. */ - if (utarray_len(queue->object_ids) == 0) { + if (utarray_len(queue->object_notifications) == 0) { event_loop_remove_file(loop, client_sock); } } @@ -686,16 +680,23 @@ void subscribe_to_updates(client *client_context, int conn) { LOG_WARN("Failed to receive file descriptor from client on fd %d.", conn); return; } - CHECKM(HASH_CNT(handle, plasma_state->plasma_store_info->objects) == 0, - "plasma_subscribe should be called before any objects are created."); + /* Create a new array to buffer notifications that can't be sent to the * subscriber yet because the socket send buffer is full. TODO(rkn): the queue * never gets freed. */ notification_queue *queue = (notification_queue *) malloc(sizeof(notification_queue)); queue->subscriber_fd = fd; - utarray_new(queue->object_ids, &object_id_icd); + utarray_new(queue->object_notifications, &object_info_icd); HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue); + + /* Push notifications to the new subscriber about existing objects. */ + object_table_entry *entry, *temp_entry; + HASH_ITER(handle, plasma_state->plasma_store_info->objects, entry, + temp_entry) { + utarray_push_back(queue->object_notifications, &entry->info); + } + send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, 0); } void process_message(event_loop *loop,