From b370a1df57109032face7804df770bdaa5556d71 Mon Sep 17 00:00:00 2001 From: Ujval Misra Date: Thu, 3 Nov 2016 17:33:54 -0700 Subject: [PATCH] Merge sealed_objects and open_objects into a single hashmap (#25) * Merge sealed_objects and open_objects into a single hashmap * Entry contains enum that determines whether it is open or closed * Removed unused variable. * Applied Robert's patch * Fixed styling. --- src/plasma/plasma.h | 2 + src/plasma/plasma_store.c | 79 ++++++++++++++------------------------- 2 files changed, 31 insertions(+), 50 deletions(-) diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index 1bceac4558d6e..6bbba736de474 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -41,6 +41,8 @@ typedef struct { enum object_status { OBJECT_NOT_FOUND = 0, OBJECT_FOUND = 1 }; +typedef enum { OPEN, SEALED } object_state; + enum plasma_message_type { /** Create a new object. */ PLASMA_CREATE = 128, diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index 0e7d404d05a23..accad7df523c6 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -61,8 +61,10 @@ typedef struct { UT_hash_handle handle; /* Pointer to the object data. Needed to free the object. */ uint8_t *pointer; - /** An array of the clients that are currently using this object. */ + /* An array of the clients that are currently using this object. */ UT_array *clients; + /* The state of the object, e.g., whether it is open or sealed. */ + object_state state; } object_table_entry; typedef struct { @@ -103,11 +105,8 @@ typedef struct { struct plasma_store_state { /* Event loop of the plasma store. */ event_loop *loop; - /* Objects that are still being written by their owner process. */ - object_table_entry *open_objects; - /* Objects that have already been sealed by their owner process and - * can now be shared with other processes. */ - object_table_entry *sealed_objects; + /* A hash table of all the objects in the store. */ + object_table_entry *objects; /* Objects that processes are waiting for. */ object_notify_entry *objects_notify; /** The pending notifications that have not been sent to subscribers because @@ -119,8 +118,7 @@ struct plasma_store_state { plasma_store_state *init_plasma_store(event_loop *loop) { plasma_store_state *state = malloc(sizeof(plasma_store_state)); state->loop = loop; - state->open_objects = NULL; - state->sealed_objects = NULL; + state->objects = NULL; state->objects_notify = NULL; state->pending_notifications = NULL; return state; @@ -152,10 +150,7 @@ void create_object(client *client_context, object_table_entry *entry; /* TODO(swang): Return these error to the client instead of exiting. */ - HASH_FIND(handle, plasma_state->open_objects, &object_id, sizeof(object_id), - entry); - CHECKM(entry == NULL, "Cannot create object twice."); - HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id), + HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), entry); CHECKM(entry == NULL, "Cannot create object twice."); @@ -175,9 +170,9 @@ void create_object(client *client_context, entry->fd = fd; entry->map_size = map_size; entry->offset = offset; + entry->state = OPEN; utarray_new(entry->clients, &client_icd); - HASH_ADD(handle, plasma_state->open_objects, object_id, sizeof(object_id), - entry); + HASH_ADD(handle, plasma_state->objects, object_id, sizeof(object_id), entry); result->handle.store_fd = fd; result->handle.mmap_size = map_size; result->data_offset = offset; @@ -195,9 +190,9 @@ int get_object(client *client_context, plasma_object *result) { plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; - HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id), + HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), entry); - if (entry) { + if (entry && entry->state == SEALED) { result->handle.store_fd = entry->fd; result->handle.mmap_size = entry->map_size; result->data_offset = entry->offset; @@ -244,30 +239,21 @@ int remove_client_from_object_clients(object_table_entry *entry, void release_object(client *client_context, object_id object_id) { plasma_store_state *plasma_state = client_context->plasma_state; - object_table_entry *open_entry; - object_table_entry *sealed_entry; - - HASH_FIND(handle, plasma_state->open_objects, &object_id, sizeof(object_id), - open_entry); - HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id), - sealed_entry); - /* Exactly one of open_entry and sealed_entry should be NULL. */ - CHECK((open_entry == NULL) != (sealed_entry == NULL)); + object_table_entry *entry; + HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), + entry); + CHECK(entry != NULL); /* Remove the client from the object's array of clients. */ - if (open_entry != NULL) { - CHECK(remove_client_from_object_clients(open_entry, client_context) == 1); - } else { - CHECK(remove_client_from_object_clients(sealed_entry, client_context) == 1); - } + CHECK(remove_client_from_object_clients(entry, client_context) == 1); } /* Check if an object is present. */ int contains_object(client *client_context, object_id object_id) { plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; - HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id), + HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), entry); - return entry ? OBJECT_FOUND : OBJECT_NOT_FOUND; + return entry && (entry->state == SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND; } /* Seal an object that has been created in the hash table. */ @@ -275,15 +261,11 @@ void seal_object(client *client_context, object_id object_id) { LOG_DEBUG("sealing object"); // TODO(pcm): add object_id here plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; - HASH_FIND(handle, plasma_state->open_objects, &object_id, sizeof(object_id), + HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), entry); - CHECK(entry != NULL); - /* Move the object table entry from the table of open objects to the table of - * sealed objects. */ - HASH_DELETE(handle, plasma_state->open_objects, entry); - HASH_ADD(handle, plasma_state->sealed_objects, object_id, sizeof(object_id), - entry); - + CHECK(entry != NULL && entry->state == OPEN); + /* Set the state of object to SEALED. */ + entry->state = SEALED; /* Inform all subscribers that a new object has been sealed. */ notification_queue *queue, *temp_queue; HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) { @@ -325,16 +307,18 @@ void delete_object(client *client_context, object_id object_id) { LOG_DEBUG("deleting object"); // TODO(rkn): add object_id here plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; - HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id), + HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), entry); /* TODO(rkn): This should probably not fail, but should instead throw an * error. Maybe we should also support deleting objects that have been created * but not sealed. */ - CHECKM(entry != NULL, "To delete an object it must have been sealed."); + CHECKM(entry != NULL, "To delete an object it must have been created."); + CHECKM(entry->state == SEALED, + "To delete an object it must have been sealed."); CHECKM(utarray_len(entry->clients) == 0, "To delete an object, there must be no clients currently using it."); uint8_t *pointer = entry->pointer; - HASH_DELETE(handle, plasma_state->sealed_objects, entry); + HASH_DELETE(handle, plasma_state->objects, entry); dlfree(pointer); utarray_free(entry->clients); free(entry); @@ -389,9 +373,7 @@ void subscribe_to_updates(client *client_context, int conn) { plasma_store_state *plasma_state = client_context->plasma_state; char dummy; int fd = recv_fd(conn, &dummy, 1); - CHECKM(HASH_CNT(handle, plasma_state->open_objects) == 0, - "plasma_subscribe should be called before any objects are created."); - CHECKM(HASH_CNT(handle, plasma_state->sealed_objects) == 0, + CHECKM(HASH_CNT(handle, plasma_state->objects) == 0, "plasma_subscribe should be called before any objects are created."); /* Create a new array to buffer notifications that can't be sent to the * subscriber yet because the socket send buffer is full. TODO(rkn): the queue @@ -455,10 +437,7 @@ void process_message(event_loop *loop, * lists. */ plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry, *temp_entry; - HASH_ITER(handle, plasma_state->open_objects, entry, temp_entry) { - remove_client_from_object_clients(entry, client_context); - } - HASH_ITER(handle, plasma_state->sealed_objects, entry, temp_entry) { + HASH_ITER(handle, plasma_state->objects, entry, temp_entry) { remove_client_from_object_clients(entry, client_context); } } break;