Skip to content

Commit

Permalink
Merge sealed_objects and open_objects into a single hashmap (ray-proj…
Browse files Browse the repository at this point in the history
…ect#25)

* Merge sealed_objects and open_objects into a single hashmap

* Entry contains enum that determines whether it is open or closed

* Removed unused variable.

* Applied Robert's patch

* Fixed styling.
  • Loading branch information
ujvl authored and robertnishihara committed Nov 4, 2016
1 parent 5dfd76e commit b370a1d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 50 deletions.
2 changes: 2 additions & 0 deletions src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ typedef struct {

enum object_status { OBJECT_NOT_FOUND = 0, OBJECT_FOUND = 1 };

typedef enum { OPEN, SEALED } object_state;

enum plasma_message_type {
/** Create a new object. */
PLASMA_CREATE = 128,
Expand Down
79 changes: 29 additions & 50 deletions src/plasma/plasma_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ typedef struct {
UT_hash_handle handle;
/* Pointer to the object data. Needed to free the object. */
uint8_t *pointer;
/** An array of the clients that are currently using this object. */
/* An array of the clients that are currently using this object. */
UT_array *clients;
/* The state of the object, e.g., whether it is open or sealed. */
object_state state;
} object_table_entry;

typedef struct {
Expand Down Expand Up @@ -103,11 +105,8 @@ typedef struct {
struct plasma_store_state {
/* Event loop of the plasma store. */
event_loop *loop;
/* Objects that are still being written by their owner process. */
object_table_entry *open_objects;
/* Objects that have already been sealed by their owner process and
* can now be shared with other processes. */
object_table_entry *sealed_objects;
/* A hash table of all the objects in the store. */
object_table_entry *objects;
/* Objects that processes are waiting for. */
object_notify_entry *objects_notify;
/** The pending notifications that have not been sent to subscribers because
Expand All @@ -119,8 +118,7 @@ struct plasma_store_state {
plasma_store_state *init_plasma_store(event_loop *loop) {
plasma_store_state *state = malloc(sizeof(plasma_store_state));
state->loop = loop;
state->open_objects = NULL;
state->sealed_objects = NULL;
state->objects = NULL;
state->objects_notify = NULL;
state->pending_notifications = NULL;
return state;
Expand Down Expand Up @@ -152,10 +150,7 @@ void create_object(client *client_context,

object_table_entry *entry;
/* TODO(swang): Return these error to the client instead of exiting. */
HASH_FIND(handle, plasma_state->open_objects, &object_id, sizeof(object_id),
entry);
CHECKM(entry == NULL, "Cannot create object twice.");
HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id),
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
CHECKM(entry == NULL, "Cannot create object twice.");

Expand All @@ -175,9 +170,9 @@ void create_object(client *client_context,
entry->fd = fd;
entry->map_size = map_size;
entry->offset = offset;
entry->state = OPEN;
utarray_new(entry->clients, &client_icd);
HASH_ADD(handle, plasma_state->open_objects, object_id, sizeof(object_id),
entry);
HASH_ADD(handle, plasma_state->objects, object_id, sizeof(object_id), entry);
result->handle.store_fd = fd;
result->handle.mmap_size = map_size;
result->data_offset = offset;
Expand All @@ -195,9 +190,9 @@ int get_object(client *client_context,
plasma_object *result) {
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id),
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
if (entry) {
if (entry && entry->state == SEALED) {
result->handle.store_fd = entry->fd;
result->handle.mmap_size = entry->map_size;
result->data_offset = entry->offset;
Expand Down Expand Up @@ -244,46 +239,33 @@ int remove_client_from_object_clients(object_table_entry *entry,

void release_object(client *client_context, object_id object_id) {
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *open_entry;
object_table_entry *sealed_entry;

HASH_FIND(handle, plasma_state->open_objects, &object_id, sizeof(object_id),
open_entry);
HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id),
sealed_entry);
/* Exactly one of open_entry and sealed_entry should be NULL. */
CHECK((open_entry == NULL) != (sealed_entry == NULL));
object_table_entry *entry;
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
CHECK(entry != NULL);
/* Remove the client from the object's array of clients. */
if (open_entry != NULL) {
CHECK(remove_client_from_object_clients(open_entry, client_context) == 1);
} else {
CHECK(remove_client_from_object_clients(sealed_entry, client_context) == 1);
}
CHECK(remove_client_from_object_clients(entry, client_context) == 1);
}

/* Check if an object is present. */
int contains_object(client *client_context, object_id object_id) {
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id),
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
return entry ? OBJECT_FOUND : OBJECT_NOT_FOUND;
return entry && (entry->state == SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND;
}

/* Seal an object that has been created in the hash table. */
void seal_object(client *client_context, object_id object_id) {
LOG_DEBUG("sealing object"); // TODO(pcm): add object_id here
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->open_objects, &object_id, sizeof(object_id),
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
CHECK(entry != NULL);
/* Move the object table entry from the table of open objects to the table of
* sealed objects. */
HASH_DELETE(handle, plasma_state->open_objects, entry);
HASH_ADD(handle, plasma_state->sealed_objects, object_id, sizeof(object_id),
entry);

CHECK(entry != NULL && entry->state == OPEN);
/* Set the state of object to SEALED. */
entry->state = SEALED;
/* Inform all subscribers that a new object has been sealed. */
notification_queue *queue, *temp_queue;
HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) {
Expand Down Expand Up @@ -325,16 +307,18 @@ void delete_object(client *client_context, object_id object_id) {
LOG_DEBUG("deleting object"); // TODO(rkn): add object_id here
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id),
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
/* TODO(rkn): This should probably not fail, but should instead throw an
* error. Maybe we should also support deleting objects that have been created
* but not sealed. */
CHECKM(entry != NULL, "To delete an object it must have been sealed.");
CHECKM(entry != NULL, "To delete an object it must have been created.");
CHECKM(entry->state == SEALED,
"To delete an object it must have been sealed.");
CHECKM(utarray_len(entry->clients) == 0,
"To delete an object, there must be no clients currently using it.");
uint8_t *pointer = entry->pointer;
HASH_DELETE(handle, plasma_state->sealed_objects, entry);
HASH_DELETE(handle, plasma_state->objects, entry);
dlfree(pointer);
utarray_free(entry->clients);
free(entry);
Expand Down Expand Up @@ -389,9 +373,7 @@ void subscribe_to_updates(client *client_context, int conn) {
plasma_store_state *plasma_state = client_context->plasma_state;
char dummy;
int fd = recv_fd(conn, &dummy, 1);
CHECKM(HASH_CNT(handle, plasma_state->open_objects) == 0,
"plasma_subscribe should be called before any objects are created.");
CHECKM(HASH_CNT(handle, plasma_state->sealed_objects) == 0,
CHECKM(HASH_CNT(handle, plasma_state->objects) == 0,
"plasma_subscribe should be called before any objects are created.");
/* Create a new array to buffer notifications that can't be sent to the
* subscriber yet because the socket send buffer is full. TODO(rkn): the queue
Expand Down Expand Up @@ -455,10 +437,7 @@ void process_message(event_loop *loop,
* lists. */
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry, *temp_entry;
HASH_ITER(handle, plasma_state->open_objects, entry, temp_entry) {
remove_client_from_object_clients(entry, client_context);
}
HASH_ITER(handle, plasma_state->sealed_objects, entry, temp_entry) {
HASH_ITER(handle, plasma_state->objects, entry, temp_entry) {
remove_client_from_object_clients(entry, client_context);
}
} break;
Expand Down

0 comments on commit b370a1d

Please sign in to comment.