Skip to content

Commit

Permalink
Attempting to use the library with simple worker threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Kallem committed Oct 14, 2014
1 parent 0d72dc5 commit bb0e13c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 44 deletions.
17 changes: 10 additions & 7 deletions ConnThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ int recv_begin(DataBlock *db, int *src, int node_nr, int tag) {
printf("recv_begin: pull node from list fail: tag %d, src %d\n", tag, largest_full_list_index);
exit(-1);
}
pthread_mutex_unlock(&full_list[tag][RECV][largest_full_list_index].mutex);

// Lock the corresponding 'busy' receive list (the one for the same src and tag)
pthread_mutex_lock(&busy_list[tag][RECV][largest_full_list_index].mutex);
Expand All @@ -60,7 +61,6 @@ int recv_begin(DataBlock *db, int *src, int node_nr, int tag) {

// Unlock the lists
pthread_mutex_unlock(&busy_list[tag][RECV][largest_full_list_index].mutex);
pthread_mutex_unlock(&full_list[tag][RECV][largest_full_list_index].mutex);

*src = largest_full_list_index;

Expand Down Expand Up @@ -223,9 +223,13 @@ void *readFromSocket(void *param) {
}

// Read from socket() in that list node
///printf("Node %d going to read from node %d for tag %d\n", local_host, src, tag);
//fflush(stdout);
size_t n;
n = read(conn_fd, node->db.data, space_remain_in_cur_node);

//printf("Node %d read >%s< with size %ld from node %d for tag %d\n", local_host, (char *) node->db.data, n, src, tag);
//fflush(stdout);

if (n < 0) {
printf("readFromSocket: error on reading from src %d on tag %d\n", src, tag);
pthread_exit(NULL);
Expand All @@ -240,11 +244,8 @@ void *readFromSocket(void *param) {
pthread_exit(NULL);
}

if (space_remain_in_cur_node == 0) {
if (space_remain_in_cur_node == 0)
pull_new_free_node = true;
} else {
pull_new_free_node = false;
}

if (pull_new_free_node) { // Current node has been fully filled, next time we need to pull new node
// Add the list node to the tail of the 'full' receive list
Expand Down Expand Up @@ -282,6 +283,8 @@ void *writeToSocket(void *param) {
}

// Write data in that node to socket
//printf("Node %d writing >%s< with size %ld to node %d\n", local_host, (char *) node->db.data, node->db.size, dest);
//fflush(stdout);
size_t n;
n = write(conn_fd, node->db.data, node->db.size);

Expand All @@ -301,7 +304,7 @@ void *writeToSocket(void *param) {
printf("writeToSocket: add node to list fail: tag %d, dest %d\n", tag, dest);
pthread_exit(NULL);
}
pthread_mutex_lock(&free_list[tag][SEND][dest].mutex);
pthread_mutex_unlock(&free_list[tag][SEND][dest].mutex);
}
}
}
90 changes: 53 additions & 37 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ using namespace std;

const char *conf = "/home/ajk2214/cs6901/tj/conf";
const char *domain = ".clic.cs.columbia.edu";
const int tags = 3;
const int tags = 2;
const int conn_type = 2; // Two types of connection. 0: read; 1: write.

int local_host;
Expand All @@ -34,18 +34,44 @@ void error(const char *info) {
}


//void *worker() {
// void *input_block;
// void *output_block;
//
// while (true) {
// if (recv_dequeue(&input_block, &src, tag)) {
//
// } else {
//
// }
// }
//}
void *worker(void *param) {
thr_param *p = (thr_param *) param;
int tag = p->tag;
int n;
DataBlock db;

if (tag == 0) {
// Send something to each node, followed by a termination message
for (n = 0; n < hosts; n++) {
while(!send_begin(&db, n, tag+1));
strcpy((char *) db.data, "THIS IS MY LARGE TEST STRING");
db.size = strlen((const char *) db.data) + 1;
send_end(db, n, tag+1);

while(!send_begin(&db, n, tag+1));
strcpy((char *) db.data, "END");
db.size = strlen((const char *) db.data) + 1;
send_end(db, n, tag+1);
}
} else if (tag == 1) {
// Receive until termination received from all nodes
DataBlock db;
int src;
int t = 0;

while (t != hosts) {
while (!recv_begin(&db, &src, hosts, tag));
printf("%d got %s from %d\n", local_host, (char *) db.data, src);
fflush(stdout);
recv_end(db, src, tag);

if (strcmp((const char *) db.data, "END") == 0)
t++;
}
}

return NULL;
}

int **setup(const char *conf_filename, const char *domain, size_t tags, size_t max_hosts) {
size_t hosts = 0;
Expand Down Expand Up @@ -228,30 +254,20 @@ int main(int argc, char** argv) {
}


// /* spawn T worker threads
// * T: tag number
// */
// worker_threads = new pthread_t[tags];
// for (t = 0; t < tags; t++) {
// pthread_create(&worker_threads[t], NULL, worker, NULL);
// }
DataBlock dbs;
DataBlock dbr;
int src;

while(!send_begin(&dbs, 0, 0));
strcpy((char *) dbs.data, "My test string");
dbs.size = 15;
send_end(dbs, 0, 0);

int i;
if (local_host == 0) {
for (i = 0; i < 10; i++) {
while (!recv_begin(&dbr, &src, hosts, 0));
printf("%d got %s from %d\n", local_host, (char *) dbr.data, src);
fflush(stdout);
recv_end(dbr, src, 0);
}
/* spawn T worker threads
* T: tag number
*/
worker_threads = new pthread_t[tags];
for (t = 0; t < tags; t++) {
thr_param *param;
param = new thr_param();
param->tag = t;
pthread_create(&worker_threads[t], NULL, &worker, (void *) param);
}

for (t = 0; t < tags; t++) {
void *retval;
pthread_join(worker_threads[t], &retval);
}

return 0;
Expand Down

0 comments on commit bb0e13c

Please sign in to comment.