Skip to content

Commit

Permalink
fix bug: hang client with log
Browse files Browse the repository at this point in the history
  • Loading branch information
StarCoral authored and justin0u0 committed Jul 5, 2021
1 parent 06015cf commit d212902
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 18 deletions.
17 changes: 12 additions & 5 deletions Gemini-1.0/src/hook.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ static void *real_dlsym(void *handle, const char *symbol) {
}

struct hookInfo {
int debug_mode;
int debug_mode = 0;
void *preHooks[NUM_HOOK_SYMBOLS];
void *postHooks[NUM_HOOK_SYMBOLS];
int call_count[NUM_HOOK_SYMBOLS];

hookInfo() {
const char *envHookDebug;

Expand Down Expand Up @@ -239,6 +239,7 @@ void configure_connection() {
if (port != NULL) pod_manager_port = atoi(port);

DEBUG("Pod manager: %s:%u", pod_manager_ip, pod_manager_port);
INFO("Pod manager: %s:%u", pod_manager_ip, pod_manager_port);
}

int attempt_connection(int __fd, __CONST_SOCKADDR_ARG __addr, socklen_t __len) {
Expand Down Expand Up @@ -399,7 +400,8 @@ double estimate_full_burst(double measured_burst, double measured_window) {

DEBUG("measured burst: %.3f ms, window: %.3f ms, estimated full burst: %.3f ms", measured_burst,
measured_window, full_burst);

INFO("measured burst: %.3f ms, window: %.3f ms, estimated full burst: %.3f ms", measured_burst,
measured_window, full_burst);
return full_burst;
}

Expand Down Expand Up @@ -428,6 +430,7 @@ double get_token_from_scheduler(double next_burst) {
new_quota = get_msg_data<double>(attached, rpos);

DEBUG("Get token from scheduler, quota: %f", new_quota);
INFO("Get token from scheduler, quota: %f", new_quota);
return new_quota;
}

Expand Down Expand Up @@ -459,7 +462,10 @@ void *wait_cuda_kernels(void *args) {
// sleep until token expired or being notified
pthread_mutex_lock(&overuse_trk_mutex);
int rc = pthread_cond_timedwait(&overuse_trk_intr_cond, &overuse_trk_mutex, &ts);
if (rc != ETIMEDOUT) DEBUG("overuse tracking thread interrupted");
if (rc != ETIMEDOUT) {
DEBUG("overuse tracking thread interrupted");
INFO("overuse tracking thread interrupted");
}
pthread_mutex_unlock(&overuse_trk_mutex);

// synchronize all running kernels
Expand All @@ -476,7 +482,7 @@ void *wait_cuda_kernels(void *args) {
overuse = std::max(0.0, (double)elapsed_ms - quota_time);

DEBUG("overuse: %.3f ms", overuse);

INFO("overuse: %.3f ms", overuse);
// notify tracking complete
pthread_mutex_lock(&overuse_trk_mutex);
overuse_trk_cmpl = true;
Expand Down Expand Up @@ -556,6 +562,7 @@ CUresult cuMemFree_prehook(CUdeviceptr ptr) {
pthread_mutex_lock(&allocation_mutex);
if (allocation_map.find(ptr) == allocation_map.end()) {
DEBUG("Freeing unknown memory! %zx", ptr);
INFO("Freeing unknown memory! %zx", ptr);
} else {
gpu_mem_used -= allocation_map[ptr];
update_memory_usage(allocation_map[ptr], 0);
Expand Down
38 changes: 25 additions & 13 deletions Gemini-1.0/src/pod-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ pthread_mutex_t kernel_launch_count_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t sleeping_count_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t sleeping_count_cond = PTHREAD_COND_INITIALIZER;


/*scheduler recv signal sync*/
int scheduler_recv_sync = 0;
pthread_mutex_t scheduler_recv_sync_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t scheduler_recv_sync_cond = PTHREAD_COND_INITIALIZER;


/* communication with scheduler */
size_t pod_name_len;
char pod_name[HOST_NAME_MAX];
Expand Down Expand Up @@ -386,19 +386,23 @@ double hook_kernel_launch(int sockfd, double overuse_ms, double burst, char* cli
bzero(sbuf, REQ_MSG_LEN);
req_id = prepare_request(sbuf, REQ_QUOTA, pod_overuse_ms, max_burst);
request_queue.push({req_id, sbuf});
// wake scheduler thread up
// wake scheduler thread up
int ok = pthread_cond_signal(&req_queue_cond);
INFO("[RIYACHU] %s send signal & req_queue_cond %d, req_id %d", client_name, ok, req_id);
pthread_mutex_unlock(&req_queue_mutex);
int check = pthread_mutex_unlock(&req_queue_mutex);

INFO("[RIYACHU] %s unlock req_queue_mutex %d, req_id %d", client_name, check, req_id);

INFO("[RIYACHU] %s before while complete %d, req_id %d", client_name, complete, req_id);
// wait for response
while (!complete) {
while (true) {
pthread_mutex_lock(&rsp_map_mutex);

INFO("[RIYACHU] %s in while wait complete %d, req_id %d", client_name, complete, req_id);

pthread_mutex_lock(&scheduler_recv_sync_mutex);
scheduler_recv_sync = 1;
pthread_cond_signal(&scheduler_recv_sync_cond);
INFO("[RIYACHU] %s scheduler_recv_sync %d, req_id %d", client_name, scheduler_recv_sync, req_id);
pthread_mutex_unlock(&scheduler_recv_sync_mutex);

Expand Down Expand Up @@ -426,9 +430,10 @@ double hook_kernel_launch(int sockfd, double overuse_ms, double burst, char* cli
}
INFO("[RIYACHU] %s after if complete %d, req_id %d", client_name, complete,req_id);
pthread_mutex_unlock(&rsp_map_mutex);
if(complete) break;

}

INFO("[RIYACHU] %s after while true, %d", client_name, req_id);
delete[] sbuf;

// update quota state and notify threads waiting on quota state
Expand Down Expand Up @@ -483,8 +488,13 @@ void *hook_thread_func(void *args) {

//bzero(rbuf, REQ_MSG_LEN);
ssize_t rc;
while ((rc = recv(sockfd, rbuf, REQ_MSG_LEN, 0)) > 0) {

int recv_zero_times = 0;
while (recv_zero_times <= 5) {
if((rc = recv(sockfd, rbuf, REQ_MSG_LEN, 0)) <= 0){
recv_zero_times++;
INFO("[RIYACHU] %s (pod manager)hook_thread_func recv - len 0, cnt %ld", client_name, recv_zero_times);
continue;
}
comm_request_t req;
reqid_t rid;
size_t pos = 0; // attached data reading position
Expand Down Expand Up @@ -524,7 +534,7 @@ void *hook_thread_func(void *args) {
INFO("[RIYACHU] %s (pod manager)hook_thread_func send, %ld", client_name ,rid);
}
}

INFO("%s connetion closed by peer. recv() returns %ld.", client_name, rc);
// since hook library close socket only when process terminated, we can use this as an indicator
// of process termination, and recover memory usage
Expand All @@ -546,11 +556,13 @@ void *hook_thread_func(void *args) {

// forward requests to scheduler
void *scheduler_thread_send_func(void *args) {
INFO("scheduler_thread_send_func");
int sockfd = *((int *)args);
ssize_t send_rc;
/* waiting for request from hook threads */
while (true) {
pthread_mutex_lock(&req_queue_mutex);

pthread_cond_wait(&req_queue_cond, &req_queue_mutex);
if (!request_queue.empty()) {
// process request
Expand All @@ -576,7 +588,7 @@ void *scheduler_thread_recv_func(void *args) {

char buf[RSP_MSG_LEN], *attached;
ssize_t rc;
DEBUG("scheduler_thread_recv_func");
INFO("scheduler_thread_recv_func");
// int flags = fcntl(sockfd, F_GETFL, 0);
// if(fcntl(sockfd, F_SETFL, flags & ~O_NONBLOCK) < 0) {
// ERROR("fcntl() error");
Expand All @@ -596,9 +608,7 @@ void *scheduler_thread_recv_func(void *args) {
INFO("[RIYACHU] (pod manager)scheduler_thread_recv_func recv > 0, req_id %ld", req_id);
DEBUG("req_id %d complete.", req_id);
INFO("req_id %d complete.", req_id);
// put response data into response_map and notify hook threads
pthread_mutex_lock(&rsp_map_mutex);
response_map.insert(std::make_pair(req_id, rsp));


pthread_mutex_lock(&scheduler_recv_sync_mutex);
while(scheduler_recv_sync == 0){
Expand All @@ -607,9 +617,11 @@ void *scheduler_thread_recv_func(void *args) {
}
pthread_mutex_unlock(&scheduler_recv_sync_mutex);

// put response data into response_map and notify hook threads
pthread_mutex_lock(&rsp_map_mutex);
response_map.insert(std::make_pair(req_id, rsp));
int ok = pthread_cond_signal(&rsp_map_cond);
INFO("[RIYACHU] (pod manager)scheduler_thread_recv_func signal %d, req_id %ld", ok, req_id);

pthread_mutex_unlock(&rsp_map_mutex);
}
}
Expand Down

0 comments on commit d212902

Please sign in to comment.