Skip to content

Commit

Permalink
add methods removehead and addtail in connection threads
Browse files Browse the repository at this point in the history
  • Loading branch information
xinluleo committed Oct 13, 2014
1 parent ea07c62 commit 26d823a
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 78 deletions.
185 changes: 110 additions & 75 deletions ConnThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ int recv_begin(DataBlock *db, int *src, int node_nr, int tag) {

//'full' receive list is already locked now
// pull the 1st node from the 'full' list
ListNode *head = full_list[tag][0][largest_full_list_index].head;
ListNode *node = head->next;
head->next = node->next;
node->next->prev = head;
node->prev = NULL;
node->next = NULL;
ListNode *node = full_list[tag][0][largest_full_list_index].removeHead();
if (node == NULL) {
error("recv_begin: pull node from list fail: tag %d, src %d\n", tag, largest_full_list_index);
}

//lock the corresponding ‘busy’ receive list (the one for the same src and tag)
pthread_mutex_lock(&busy_list[tag][0][largest_full_list_index].mutex);
Expand Down Expand Up @@ -86,11 +84,10 @@ void recv_end(DataBlock db, int src, int tag) {
//lock the ‘free’ receive list for the given src and tag
pthread_mutex_lock(&free_list[tag][0][src].mutex);
//add that list node to the tail of the ‘free’ receive list
ListNode *tail = free_list[tag][0][src].tail;
node->next = tail;
node->prev = tail->prev;
tail->prev = node;
node->prev->next = node;
if (free_list[tag][0][src].addTail(node) == -1) {
error("recv_end: add node to list fail: tag %d, src %d\n", tag, src);
}

//unlock the ‘free’ receive list for the given src and tag
pthread_mutex_unlock(&free_list[tag][0][src].mutex);
}
Expand All @@ -108,12 +105,11 @@ int send_begin(DataBlock *db, int dest, int tag) {
}

//pull the 1st list node from the free send list
ListNode *head = free_list[tag][1][dest].head;
ListNode *node = head->next;
head->next = node->next;
node->next->prev = head;
node->prev = NULL;
node->next = NULL;
ListNode *node = free_list[tag][1][dest].removeHead();
if (node == NULL) {
error("send_begin: pull node from list fail: tag %d, dest %d\n", tag, dest);
}

//unlock the 'free' send list
pthread_mutex_unlock(&free_list[tag][1][dest].mutex);

Expand Down Expand Up @@ -150,11 +146,9 @@ void send_end(DataBlock db, int dest, int tag) {
//lock the ‘full’ send list for the given dest and tag
pthread_mutex_lock(&full_list[tag][1][dest].mutex);
//add that list node to the tail of the ‘full’ send list
ListNode *tail = full_list[tag][1][dest].tail;
node->next = tail;
node->prev = tail->prev;
tail->prev->next = node;
tail->prev = node;
if (full_list[tag][1][dest].addTail(node) == -1) {
error("send_end: add node to list fail: tag %d, dest %d\n", tag, dest);
}
//unlock the ‘full’ send list for the given dest and tag
pthread_mutex_unlock(&full_list[tag][1][dest].mutex);
}
Expand All @@ -170,23 +164,46 @@ void *readFromSocket(void *param) {
bool pull_new_free_node = true;
size_t space_remain_in_cur_node = 0;
while (true) {
if (pull_new_free_node) {
//lock the 'free' receive list
pthread_mutex_lock(&free_list[tag][0][src].mutex);
//pull the 1st list node from the 'free' receive list
ListNode *head = free_list[tag][0][src].head;
node = head->next;
head->next = node->next;
node->next->prev = head;
node->prev = NULL;
node->next = NULL;
space_remain_in_cur_node = BLOCK_SIZE;
//unlock the 'free' receive list
pthread_mutex_unlock(&free_list[tag][0][src].mutex);
}
//read from socket() in that list node
size_t n;
if (src != localhost) {
if (src == localhost) {
//local data transfer
//lock the 'full' send list
pthread_mutex_lock(&full_list[tag][1][localhost].mutex);
//pull the 1st node from the 'full' send list
node = full_list[tag][1][localhost].removeHead();
if (node == NULL) {
printf("readFromSocket: pull node from list fail: tag %d, local\n", tag);
pthread_exit(NULL);
}
//unlock the 'full' send list
pthread_mutex_unlock(&full_list[tag][1][localhost].mutex);

//lock the 'full' receive list
pthread_mutex_lock(&full_list[tag][0][localhost].mutex);
//add the node to the tail of the 'full' receive list
if (full_list[tag][0][localhost].addTail(node) == -1) {
printf("readFromSocket: add node to list fail: tag %d, local\n", tag);
pthread_exit(NULL);
}
//unlock the 'full' receive list
pthread_mutex_unlock(&full_list[tag][0][localhost].mutex);

} else {
if (pull_new_free_node) {
//lock the 'free' receive list
pthread_mutex_lock(&free_list[tag][0][src].mutex);
//pull the 1st list node from the 'free' receive list
node = free_list[tag][0][src].removeHead();
if (node == NULL) {
printf("readFromSocket: pull node from list fail: tag %d, src %d\n", tag, src);
pthread_exit(NULL);
}
space_remain_in_cur_node = BLOCK_SIZE;
//unlock the 'free' receive list
pthread_mutex_unlock(&free_list[tag][0][src].mutex);
}
//read from socket() in that list node
size_t n;

n = read(conn_fd, node->db.data, space_remain_in_cur_node);
if (n < 0) {
printf("readFromSocket: error on reading from src %d on tag %d\n", src, tag);
Expand All @@ -203,20 +220,17 @@ void *readFromSocket(void *param) {
pull_new_free_node = true;
}

} else { //local data transfer

}
if (pull_new_free_node) { //Current node has been fully filled, next time we need to pull new node
//lock the 'full' receive list
pthread_mutex_lock(&full_list[tag][0][src].mutex);
//add the list node to the tail of the 'full' receive list
ListNode *tail = full_list[tag][0][src].tail;
node->next = tail;
node->prev = tail->prev;
tail->prev->next = node;
tail->prev = node;
//unlock the 'full' receive list
pthread_mutex_unlock(&full_list[tag][0][src].mutex);
if (pull_new_free_node) { //Current node has been fully filled, next time we need to pull new node
//lock the 'full' receive list
pthread_mutex_lock(&full_list[tag][0][src].mutex);
//add the list node to the tail of the 'full' receive list
if (full_list[tag][0][src].addTail(node) == -1) {
printf("readFromSocket: add node to list fail: tag %d, src %d\n", tag, src);
pthread_exit(NULL);
}
//unlock the 'full' receive list
pthread_mutex_unlock(&full_list[tag][0][src].mutex);
}
}
}
}
Expand All @@ -228,30 +242,51 @@ void *writeToSocket(void *param) {
int tag = p->tag;
int conn_fd = p->conn;
while (true) {
//lock the ‘full’ send list
pthread_mutex_lock(&full_list[tag][1][dest].mutex);
//pull the 1st node from the ‘full’ send list
ListNode *head = full_list[tag][1][dest].head;
ListNode *node = head->next;
head->next = node->next;
node->next->prev = head;
node->prev = NULL;
node->next = NULL;
//unlock ‘full’ send list
pthread_mutex_unlock(&full_list[tag][1][dest].mutex);
if (dest == localhost) { //local data transfer
//lock the 'full' receive list
pthread_mutex_lock(&full_list[tag][0][localhost].mutex);
//pull the 1st node from the ‘full’ receive list
node = full_list[tag][0][localhost].removeHead();
if (node == NULL) {
printf("writeToSocket: pull node from list fail: tag %d, local\n", tag);
pthread_exit(NULL);
}
//unlock ‘full’ receive list
pthread_mutex_unlock(&full_list[tag][0][localhost].mutex);

//write data in that node to socket
//lock the 'full' send list
pthread_mutex_lock(&full_list[tag][1][localhost].mutex);
//add the node to the tail of the 'full' send list
if (full_list[tag][1][localhost].addTail(node) == -1) {
printf("writeToSocket: add node to list fail: tag %d, local\n", tag);
pthread_exit(NULL);
}
//unlock the 'full' send list
pthread_mutex_unlock(&full_list[tag][1][localhost].mutex);
} else {
//lock the ‘full’ send list
pthread_mutex_lock(&full_list[tag][1][dest].mutex);
//pull the 1st node from the ‘full’ send list
node = full_list[tag][1][dest].removeHead();
if (node == NULL) {
printf("writeToSocket: pull node from list fail: tag %d, dest %d\n", tag, dest);
pthread_exit(NULL);
}
//unlock ‘full’ send list
pthread_mutex_unlock(&full_list[tag][1][dest].mutex);

//write data in that node to socket


//lock the ‘free’ send list
pthread_mutex_lock(&free_list[tag][1][dest].mutex);
//add the node to the tail of the ‘free’ send list
ListNode *tail = free_list[tag][1][dest].tail;
node->next = tail;
node->prev = tail->prev;
tail->prev->next = node;
tail->prev = node;
//unlock the ‘free’ send list
pthread_mutex_lock(&free_list[tag][1][dest].mutex);
//lock the ‘free’ send list
pthread_mutex_lock(&free_list[tag][1][dest].mutex);
//add the node to the tail of the ‘free’ send list
if (free_list[tag][1][dest].addTail(node) == -1) {
printf("writeToSocket: add node to list fail: tag %d, dest %d\n", tag, dest);
pthread_exit(NULL);
}
//unlock the ‘free’ send list
pthread_mutex_lock(&free_list[tag][1][dest].mutex);
}
}
}
12 changes: 9 additions & 3 deletions usertype.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,24 @@ struct ListNode {
ListNode *next;
};

struct List {
class List {
public:
List() {
head = new ListNode();
head->next = tail;
head->db.size = -1;
tail = new ListNode();
head->next = tail;
tail->prev = head;
head->db.size = -1;
tail->db.size = -1;
num = 0;
pthread_mutex_init(&mutex, NULL);
};

ListNode *removeHead();
int addTail(ListNode *);
int getNum() {return num;}

private:
ListNode *head;
ListNode *tail;
size_t num; // Number of elements on the list
Expand Down

0 comments on commit 26d823a

Please sign in to comment.