From d2129020450a8916eff2d53a98279c687f6adeb1 Mon Sep 17 00:00:00 2001 From: StarCoral Date: Mon, 5 Jul 2021 17:03:16 +0000 Subject: [PATCH] fix bug: hang client with log --- Gemini-1.0/src/hook.cpp | 17 ++++++++++----- Gemini-1.0/src/pod-manager.cpp | 38 ++++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 18 deletions(-) diff --git a/Gemini-1.0/src/hook.cpp b/Gemini-1.0/src/hook.cpp index a0289e7..82f0576 100644 --- a/Gemini-1.0/src/hook.cpp +++ b/Gemini-1.0/src/hook.cpp @@ -74,11 +74,11 @@ static void *real_dlsym(void *handle, const char *symbol) { } struct hookInfo { - int debug_mode; + int debug_mode = 0; void *preHooks[NUM_HOOK_SYMBOLS]; void *postHooks[NUM_HOOK_SYMBOLS]; int call_count[NUM_HOOK_SYMBOLS]; - + hookInfo() { const char *envHookDebug; @@ -239,6 +239,7 @@ void configure_connection() { if (port != NULL) pod_manager_port = atoi(port); DEBUG("Pod manager: %s:%u", pod_manager_ip, pod_manager_port); + INFO("Pod manager: %s:%u", pod_manager_ip, pod_manager_port); } int attempt_connection(int __fd, __CONST_SOCKADDR_ARG __addr, socklen_t __len) { @@ -399,7 +400,8 @@ double estimate_full_burst(double measured_burst, double measured_window) { DEBUG("measured burst: %.3f ms, window: %.3f ms, estimated full burst: %.3f ms", measured_burst, measured_window, full_burst); - + INFO("measured burst: %.3f ms, window: %.3f ms, estimated full burst: %.3f ms", measured_burst, + measured_window, full_burst); return full_burst; } @@ -428,6 +430,7 @@ double get_token_from_scheduler(double next_burst) { new_quota = get_msg_data(attached, rpos); DEBUG("Get token from scheduler, quota: %f", new_quota); + INFO("Get token from scheduler, quota: %f", new_quota); return new_quota; } @@ -459,7 +462,10 @@ void *wait_cuda_kernels(void *args) { // sleep until token expired or being notified pthread_mutex_lock(&overuse_trk_mutex); int rc = pthread_cond_timedwait(&overuse_trk_intr_cond, &overuse_trk_mutex, &ts); - if (rc != ETIMEDOUT) DEBUG("overuse tracking thread interrupted"); + if (rc != ETIMEDOUT) { + DEBUG("overuse tracking thread interrupted"); + INFO("overuse tracking thread interrupted"); + } pthread_mutex_unlock(&overuse_trk_mutex); // synchronize all running kernels @@ -476,7 +482,7 @@ void *wait_cuda_kernels(void *args) { overuse = std::max(0.0, (double)elapsed_ms - quota_time); DEBUG("overuse: %.3f ms", overuse); - + INFO("overuse: %.3f ms", overuse); // notify tracking complete pthread_mutex_lock(&overuse_trk_mutex); overuse_trk_cmpl = true; @@ -556,6 +562,7 @@ CUresult cuMemFree_prehook(CUdeviceptr ptr) { pthread_mutex_lock(&allocation_mutex); if (allocation_map.find(ptr) == allocation_map.end()) { DEBUG("Freeing unknown memory! %zx", ptr); + INFO("Freeing unknown memory! %zx", ptr); } else { gpu_mem_used -= allocation_map[ptr]; update_memory_usage(allocation_map[ptr], 0); diff --git a/Gemini-1.0/src/pod-manager.cpp b/Gemini-1.0/src/pod-manager.cpp index 41799cc..57fde28 100644 --- a/Gemini-1.0/src/pod-manager.cpp +++ b/Gemini-1.0/src/pod-manager.cpp @@ -108,12 +108,12 @@ pthread_mutex_t kernel_launch_count_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t sleeping_count_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t sleeping_count_cond = PTHREAD_COND_INITIALIZER; - /*scheduler recv signal sync*/ int scheduler_recv_sync = 0; pthread_mutex_t scheduler_recv_sync_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t scheduler_recv_sync_cond = PTHREAD_COND_INITIALIZER; + /* communication with scheduler */ size_t pod_name_len; char pod_name[HOST_NAME_MAX]; @@ -386,19 +386,23 @@ double hook_kernel_launch(int sockfd, double overuse_ms, double burst, char* cli bzero(sbuf, REQ_MSG_LEN); req_id = prepare_request(sbuf, REQ_QUOTA, pod_overuse_ms, max_burst); request_queue.push({req_id, sbuf}); - // wake scheduler thread up + // wake scheduler thread up int ok = pthread_cond_signal(&req_queue_cond); INFO("[RIYACHU] %s send signal & req_queue_cond %d, req_id %d", client_name, ok, req_id); - pthread_mutex_unlock(&req_queue_mutex); + int check = pthread_mutex_unlock(&req_queue_mutex); + + INFO("[RIYACHU] %s unlock req_queue_mutex %d, req_id %d", client_name, check, req_id); INFO("[RIYACHU] %s before while complete %d, req_id %d", client_name, complete, req_id); // wait for response - while (!complete) { + while (true) { pthread_mutex_lock(&rsp_map_mutex); + INFO("[RIYACHU] %s in while wait complete %d, req_id %d", client_name, complete, req_id); pthread_mutex_lock(&scheduler_recv_sync_mutex); scheduler_recv_sync = 1; + pthread_cond_signal(&scheduler_recv_sync_cond); INFO("[RIYACHU] %s scheduler_recv_sync %d, req_id %d", client_name, scheduler_recv_sync, req_id); pthread_mutex_unlock(&scheduler_recv_sync_mutex); @@ -426,9 +430,10 @@ double hook_kernel_launch(int sockfd, double overuse_ms, double burst, char* cli } INFO("[RIYACHU] %s after if complete %d, req_id %d", client_name, complete,req_id); pthread_mutex_unlock(&rsp_map_mutex); + if(complete) break; } - + INFO("[RIYACHU] %s after while true, %d", client_name, req_id); delete[] sbuf; // update quota state and notify threads waiting on quota state @@ -483,8 +488,13 @@ void *hook_thread_func(void *args) { //bzero(rbuf, REQ_MSG_LEN); ssize_t rc; - while ((rc = recv(sockfd, rbuf, REQ_MSG_LEN, 0)) > 0) { - + int recv_zero_times = 0; + while (recv_zero_times <= 5) { + if((rc = recv(sockfd, rbuf, REQ_MSG_LEN, 0)) <= 0){ + recv_zero_times++; + INFO("[RIYACHU] %s (pod manager)hook_thread_func recv - len 0, cnt %ld", client_name, recv_zero_times); + continue; + } comm_request_t req; reqid_t rid; size_t pos = 0; // attached data reading position @@ -524,7 +534,7 @@ void *hook_thread_func(void *args) { INFO("[RIYACHU] %s (pod manager)hook_thread_func send, %ld", client_name ,rid); } } - + INFO("%s connetion closed by peer. recv() returns %ld.", client_name, rc); // since hook library close socket only when process terminated, we can use this as an indicator // of process termination, and recover memory usage @@ -546,11 +556,13 @@ void *hook_thread_func(void *args) { // forward requests to scheduler void *scheduler_thread_send_func(void *args) { + INFO("scheduler_thread_send_func"); int sockfd = *((int *)args); ssize_t send_rc; /* waiting for request from hook threads */ while (true) { pthread_mutex_lock(&req_queue_mutex); + pthread_cond_wait(&req_queue_cond, &req_queue_mutex); if (!request_queue.empty()) { // process request @@ -576,7 +588,7 @@ void *scheduler_thread_recv_func(void *args) { char buf[RSP_MSG_LEN], *attached; ssize_t rc; - DEBUG("scheduler_thread_recv_func"); + INFO("scheduler_thread_recv_func"); // int flags = fcntl(sockfd, F_GETFL, 0); // if(fcntl(sockfd, F_SETFL, flags & ~O_NONBLOCK) < 0) { // ERROR("fcntl() error"); @@ -596,9 +608,7 @@ void *scheduler_thread_recv_func(void *args) { INFO("[RIYACHU] (pod manager)scheduler_thread_recv_func recv > 0, req_id %ld", req_id); DEBUG("req_id %d complete.", req_id); INFO("req_id %d complete.", req_id); - // put response data into response_map and notify hook threads - pthread_mutex_lock(&rsp_map_mutex); - response_map.insert(std::make_pair(req_id, rsp)); + pthread_mutex_lock(&scheduler_recv_sync_mutex); while(scheduler_recv_sync == 0){ @@ -607,9 +617,11 @@ void *scheduler_thread_recv_func(void *args) { } pthread_mutex_unlock(&scheduler_recv_sync_mutex); + // put response data into response_map and notify hook threads + pthread_mutex_lock(&rsp_map_mutex); + response_map.insert(std::make_pair(req_id, rsp)); int ok = pthread_cond_signal(&rsp_map_cond); INFO("[RIYACHU] (pod manager)scheduler_thread_recv_func signal %d, req_id %ld", ok, req_id); - pthread_mutex_unlock(&rsp_map_mutex); } }