Skip to content

Commit

Permalink
Fixed bug in local transfers. Added more messages to test
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Kallem committed Oct 15, 2014
1 parent 32cb1ec commit 33c7aa8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
45 changes: 34 additions & 11 deletions ConnThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

// Called by a worker thread when it wants to process received blocks
// Returns 0 if no data available
Expand Down Expand Up @@ -96,7 +97,6 @@ void recv_end(DataBlock db, int src, int tag) {
printf("recv_end: add node to list fail: tag %d, src %d\n", tag, src);
exit(-1);
}

// Unlock the 'free' receive list for the given src and tag
pthread_mutex_unlock(&free_list[tag][RECV][src].mutex);
}
Expand Down Expand Up @@ -183,30 +183,57 @@ void send_end(DataBlock db, int dest, int tag) {
// Can block on read
void *readFromSocket(void *param) {
thr_param *p = (thr_param *) param;
ListNode *node;
ListNode *node = NULL;
ListNode *nodeSend = NULL;
ListNode *nodeRecv = NULL;
int src = p->node;
int tag = p->tag;
int conn_fd = p->conn;

while (true) {
if (src == local_host) {
// Local data transfer. Coud optimize by acquiring both locks and then transferring everything, not just one node
// Local data transfer
if (nodeRecv == NULL) {
// Pull the 1st list node from the 'free' receive list
pthread_mutex_lock(&free_list[tag][RECV][local_host].mutex);
nodeRecv = free_list[tag][RECV][local_host].removeHead();
pthread_mutex_unlock(&free_list[tag][RECV][local_host].mutex);
}

if (nodeRecv == NULL) {
continue;
}

// Pull the 1st node from the 'full' send list
pthread_mutex_lock(&full_list[tag][SEND][local_host].mutex);
node = full_list[tag][SEND][local_host].removeHead();
nodeSend = full_list[tag][SEND][local_host].removeHead();
pthread_mutex_unlock(&full_list[tag][SEND][local_host].mutex);

if (node == NULL) {
if (nodeSend == NULL) {
continue;
}


nodeRecv->db.size = nodeSend->db.size;
memcpy(nodeRecv->db.data, (const void *) nodeSend->db.data, nodeSend->db.size);

// Add the node to the tail of the 'full' receive list
pthread_mutex_lock(&full_list[tag][RECV][local_host].mutex);
if (full_list[tag][RECV][local_host].addTail(node) == -1) {
if (full_list[tag][RECV][local_host].addTail(nodeRecv) == -1) {
printf("readFromSocket: add node to list fail: tag %d, local\n", tag);
pthread_exit(NULL);
}
pthread_mutex_unlock(&full_list[tag][RECV][local_host].mutex);

// Add the node to the 'free' send list
pthread_mutex_lock(&free_list[tag][SEND][local_host].mutex);
if (free_list[tag][SEND][local_host].addTail(nodeSend) == -1) {
printf("readFromSocket: add node to list fail: tag %d, local\n", tag);
pthread_exit(NULL);
}
pthread_mutex_unlock(&free_list[tag][SEND][local_host].mutex);

// Reset nodeRecv
nodeRecv = NULL;
} else {
// Pull the 1st list node from the 'free' receive list
pthread_mutex_lock(&free_list[tag][RECV][src].mutex);
Expand Down Expand Up @@ -285,10 +312,6 @@ void *writeToSocket(void *param) {
continue;
}

// Write data in that node to socket
//printf("Node %d writing >%s< with size %ld to node %d\n", local_host, (char *) node->db.data, node->db.size, dest);
//fflush(stdout);

size_t n;

n = write(conn_fd, &node->db.size, sizeof(node->db.size));
Expand Down
25 changes: 15 additions & 10 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,22 @@ void error(const char *info) {
void *worker(void *param) {
thr_param *p = (thr_param *) param;
int tag = p->tag;
int m;
int n;
DataBlock db;

if (tag == 0) {
// Send something to each node, followed by a termination message
for (n = 0; n < hosts; n++) {
while(!send_begin(&db, n, tag+1));
strcpy((char *) db.data, "THIS IS MY LARGE TEST STRING");
db.size = strlen((const char *) db.data) + 1;
send_end(db, n, tag+1);
for (m = 0; m < 100; m++) {
while(!send_begin(&db, n, tag+1));
sprintf((char *) db.data, "Test message %d from %d", m, local_host);
db.size = strlen((const char *) db.data) + 1;
send_end(db, n, tag+1);
}

while(!send_begin(&db, n, tag+1));
strcpy((char *) db.data, "END");
db.size = strlen((const char *) db.data) + 1;
db.size = 0;
send_end(db, n, tag+1);
}
} else if (tag == 1) {
Expand All @@ -61,12 +63,15 @@ void *worker(void *param) {

while (t != hosts) {
while (!recv_begin(&db, &src, hosts, tag));
printf("%d got %s from %d\n", local_host, (char *) db.data, src);
fflush(stdout);
recv_end(db, src, tag);

if (strcmp((const char *) db.data, "END") == 0)
if (db.size > 0) {
printf("Node %d received \"%s\" %p from node %d\n", local_host, (char *) db.data, db.data, src);
fflush(stdout);
} else {
t++;
}

recv_end(db, src, tag);
}
}

Expand Down

0 comments on commit 33c7aa8

Please sign in to comment.