Skip to content

Commit

Permalink
Code cleanup of main.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Kallem committed Oct 14, 2014
1 parent d927ad0 commit 0175f52
Showing 1 changed file with 67 additions and 43 deletions.
110 changes: 67 additions & 43 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,10 @@

using namespace std;

//DN_Queue BLOCKPTRS;
//DN_Queue READ_QUEUE_BY_T[MAX_TAG];
//DN_Queue WRITE_QUEUE_BY_DEST[MAX_NODES];


const char *conf = "/home/ajk2214/cs6901/tj/conf";
const char *domain = ".clic.cs.columbia.edu";
const int tags = 3;
const int conn_type = 2; //two types of connection. 0: read; 1: write.
const int conn_type = 2; // Two types of connection. 0: read; 1: write.

int local_host;
int hosts;
Expand All @@ -33,14 +28,12 @@ pthread_t *worker_threads;
List ***free_list, ***full_list;
HashList ***busy_list;


void error(const char *info) {
perror(info);
exit(1);
}



//void *worker() {
// void *input_block;
// void *output_block;
Expand All @@ -60,18 +53,29 @@ int **setup(const char *conf_filename, const char *domain, size_t tags, size_t m
char *hostnames[max_hosts];
char hostname_and_port[max_hosts];
FILE *fp = fopen(conf_filename, "r");
if (fp == NULL) return NULL;

if (fp == NULL)
return NULL;

while (fgets(hostname_and_port, sizeof(hostname_and_port), fp) != NULL) {
size_t len = strlen(hostname_and_port);
if (hostname_and_port[len - 1] != '\n') return NULL;

if (hostname_and_port[len - 1] != '\n')
return NULL;

for (len = 0; !isspace(hostname_and_port[len]); len++);

hostname_and_port[len] = 0;
hostnames[hosts] = strdup(hostname_and_port);
ports[hosts] = atoi(&hostname_and_port[len + 1]);
assert(ports[hosts] > 0);
if (++hosts == max_hosts) break;

if (++hosts == max_hosts)
break;
}

fclose(fp);

return tcp_grid_tags(hostnames, ports, hosts, tags, domain);
}

Expand All @@ -93,113 +97,133 @@ void printListBackward(ListNode *tail) {

void init(int node_nr) {
int t, p, n, i;

free_list = new List **[tags];
busy_list = new HashList **[tags];
full_list = new List **[tags];

for (t = 0; t < tags; t++) {
free_list[t] = new List *[conn_type];
busy_list[t] = new HashList *[conn_type];
full_list[t] = new List *[conn_type];

for (p = 0; p < conn_type; p++) {
free_list[t][p] = new List[node_nr];
busy_list[t][p] = new HashList[node_nr];
full_list[t][p] = new List[node_nr];

for (n = 0; n < node_nr; n++) {
for (i = 0; i < MAX_BLOCKS_PER_LIST; i++) {
struct DataBlock db;
db.data = malloc(BLOCK_SIZE);
for (n = 0; n < node_nr; n++) {
for (i = 0; i < MAX_BLOCKS_PER_LIST; i++) {
struct DataBlock db;
db.data = malloc(BLOCK_SIZE);

ListNode *node = new ListNode;
node->db = db;
ListNode *node = new ListNode;
node->db = db;

free_list[t][p][n].addTail(node);
}
}
free_list[t][p][n].addTail(node);
}
}
}
}



}

int main(int argc, char** argv) {
//set up connections
// Set up connections
printf("Setup\n");
fflush(stdout);

conn = setup(conf, domain, tags, 255);

printf("Finished setup\n");
fflush(stdout);

assert(conn != NULL);
int h = 0;
int t = 0;

while (conn[h] != NULL) {
for (t = 0; t < tags && conn[h][t] >= 0; t++);

if (t == tags) {
h++;
} else {
break;
}
h++;
} else {
break;
}
}

local_host = h++;
printf("local host = %d\n", local_host);

// Not sure what the purposes of the next two blocks are
//temporary change-----
for (t = 0; t < tags; t++) {
conn[local_host][t] = -2;
}
//---------------------

while (conn[h] != NULL) {
for (t = 0; t < tags && conn[h][t] >= 0; t++);

if (t == tags) {
h++;
} else {
break;
}
h++;
} else {
break;
}
}

hosts = h;
printf("hosts = %d\n", hosts);
server = conn[h + 1][0];
printf("server = %d\n", server);
//print out the connection matrix

// Print out the connection matrix for debugging
for (h = 0; h <= hosts + 1 ; h++) {
for (t = 0; t < tags; t++) {
printf("%d ", conn[h][t]);
}
printf("\n");
for (t = 0; t < tags; t++) {
printf("%d ", conn[h][t]);
}
printf("\n");
}

printf("Init hosts\n");
fflush(stdout);

init(hosts);

printf("Finished init hosts\n");
fflush(stdout);

/* spawn N * T * 2 connection threads (including connections to itself)
* N: node number
* T: tag number
* 2: read / write - 0: read connection, 1: write connection
* We differentiate connection to other nodes with those to itself in
* functions readFromSocket and writeToSocket
* The ConnThread methods differentiate data transfer between a node to
* itself and a node to other nodes
*/
conn_threads = new pthread_t **[hosts];

for (h = 0; h < hosts; h++) {
conn_threads[h] = new pthread_t *[tags];

for (t = 0; t < tags; t++) {
printf("Creating threads %d %d\n", h, t);
fflush(stdout);
conn_threads[h][t] = new pthread_t[2];
thr_param *param = new thr_param();

thr_param *param;
conn_threads[h][t] = new pthread_t[conn_type];

param = new thr_param();
param->node = h;
param->tag = t;
param->conn = conn[h][t];
param->conn_type = RECV;
pthread_create(&conn_threads[h][t][RECV], NULL, &readFromSocket, (void *)param);
pthread_create(&conn_threads[h][t][RECV], NULL, &readFromSocket, (void *) param);

param = new thr_param();
param->node = h;
param->tag = t;
param->conn = conn[h][t];
param->conn_type = SEND;
pthread_create(&conn_threads[h][t][SEND], NULL, &writeToSocket, (void *)param);
pthread_create(&conn_threads[h][t][SEND], NULL, &writeToSocket, (void *) param);
}
}

Expand Down

0 comments on commit 0175f52

Please sign in to comment.