Skip to content

Commit

Permalink
[CHG] 上传代码,优化push_server,解决了内存泄漏和增加异常情况下的socket关闭
Browse files Browse the repository at this point in the history
  • Loading branch information
mgjluoning committed Apr 8, 2015
1 parent 960a5e0 commit 21042e9
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 26 deletions.
52 changes: 37 additions & 15 deletions server/src/push_server/socket/epoll_io_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ void CEpollIOLoop::Run()
}
else if (events[i].events & EPOLLIN)
{
SOCKET_IO_DEBUG("socket read event.");
CBaseIOStream* pIOStream = _GetHandlerBySock(sock);
if (pIOStream != NULL)
{
Expand All @@ -96,6 +97,7 @@ void CEpollIOLoop::Run()
}//EPOLLIN
else if (events[i].events & EPOLLOUT)
{
SOCKET_IO_DEBUG("socket write event.");
CBaseIOStream* pIOStream = _GetHandlerBySock(sock);
if (pIOStream != NULL)
{
Expand All @@ -109,22 +111,40 @@ void CEpollIOLoop::Run()
}//EPOLLOUT
else if (events[i].events & EPOLLERR)
{
SOCKET_IO_DEBUG("socket error event.");
CBaseIOStream* pIOStream = _GetHandlerBySock(sock);
if (pIOStream->GetSockType() == SOCK_TCP_CLIENT && pIOStream->CheckConnect())
{
int32_t nError, nCode;
socklen_t nLen;
nLen = sizeof(nError);
nCode = getsockopt(pIOStream->GetSocket(), SOL_SOCKET, SO_ERROR, &nError, &nLen);
if (nCode < 0 || nError)
{
//连接失败
SOCKET_IO_WARN("socket connect failed, nCode: %d, nError: %d.", nCode, nError);
pIOStream->OnConnect(FALSE);
}
}
if (pIOStream != NULL)
{
if (pIOStream->GetSockType() == SOCK_TCP_CLIENT && pIOStream->CheckConnect())
{
int32_t nError, nCode;
socklen_t nLen;
nLen = sizeof(nError);
nCode = getsockopt(pIOStream->GetSocket(), SOL_SOCKET, SO_ERROR, &nError, &nLen);
if (nCode < 0 || nError)
{
//连接失败
SOCKET_IO_WARN("socket connect failed, nCode: %d, nError: %d.", nCode, nError);
pIOStream->OnConnect(FALSE);
}
}
else
{
int32_t nError, nCode;
socklen_t nLen;
nLen = sizeof(nError);
nCode = getsockopt(pIOStream->GetSocket(), SOL_SOCKET, SO_ERROR, &nError, &nLen);
//连接失败
SOCKET_IO_WARN("socket error event, nCode: %d, nError: %d.", nCode, nError);
pIOStream->ShutDown();
}
}
}//EPOLLERR
}
if (events) {
delete []events;
events = NULL;
}
}
}

Expand Down Expand Up @@ -181,6 +201,7 @@ void CEpollIOLoop::Add_WriteEvent( CBaseIOStream* piostream )
ev.data.fd=piostream->GetSocket();
if (piostream->GetSockType() == SOCK_TCP_CLIENT && piostream->CheckConnect())
{
SOCKET_IO_DEBUG("add write event for check connect.");
//用于判断是否connect成功
//对于111(Connection refused)(即连接一个不存在的IP)错误或者110(Connection timed out)
//(即连接一个IP存在,PORT未开放)来说,没有定义EPOLLERR,
Expand All @@ -191,8 +212,9 @@ void CEpollIOLoop::Add_WriteEvent( CBaseIOStream* piostream )
}
else
{
SOCKET_IO_DEBUG("add write event.");
//可写事件
ev.events=EPOLLOUT;
ev.events=EPOLLOUT | EPOLLERR;
epoll_ctl(m_eid, EPOLL_CTL_MOD, piostream->GetSocket(), &ev);
}
m_waker.Wake();
Expand All @@ -217,7 +239,7 @@ void CEpollIOLoop::Remove_WriteEvent( CBaseIOStream* piostream )
{
struct epoll_event ev;
ev.data.fd=piostream->GetSocket();
ev.events=EPOLLIN;
ev.events=EPOLLIN | EPOLLERR;
epoll_ctl(m_eid, EPOLL_CTL_MOD, piostream->GetSocket(), &ev);
m_waker.Wake();
}
Expand Down
39 changes: 28 additions & 11 deletions server/src/push_server/socket/ssl_client_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,22 @@ void CSSLClientAsync::UnInitSSL()
{
if (m_ssl)
{
SSL_shutdown(m_ssl);
int32_t nRet = SSL_shutdown(m_ssl);
if (nRet == 0)
{
int32_t nErrorCode = SSL_get_error(GetSSL(), nRet);
SOCKET_IO_WARN("ssl shutdown not finished, errno: %d.", nErrorCode);

}
else if (nRet == 1)
{
SOCKET_IO_DEBUG("ssl shutdown successed.");
}
else if (nRet < 0)
{
int32_t nErrorCode = SSL_get_error(GetSSL(), nRet);
SOCKET_IO_ERROR("ssl shutdown failed, errno: %d.", nErrorCode);
}
SSL_free(m_ssl);
m_ssl = NULL;
}
Expand All @@ -112,6 +127,13 @@ void CSSLClientAsync::OnConnect(BOOL bConnected)
SOCKET_IO_INFO("socket connect successed, remote ip: %s, port: %d.", GetRemoteIP(),
GetRemotePort());
DoConnect(GetSocketID());
SSL_set_mode(GetSSL(), SSL_MODE_AUTO_RETRY);
if (SSL_set_fd(GetSSL(), GetSocket()) != 1)
{
SOCKET_IO_ERROR("ssl set fd failed");
DoException(GetSocketID(), SOCKET_IO_SSL_CONNECT_FAILED);
return;
}
SSLConnect();
}
else
Expand Down Expand Up @@ -178,13 +200,6 @@ int32_t CSSLClientAsync::SSLConnect()

//阻塞的ssl_connect可能会有一个问题,服务端如果不对此处理,可能会一直卡在SSL_connect这个接口
//此处采用非阻塞的ssl_connect
SSL_set_mode(GetSSL(), SSL_MODE_AUTO_RETRY);
if (SSL_set_fd(GetSSL(), GetSocket()) != 1)
{
SOCKET_IO_ERROR("ssl set fd failed");
DoException(GetSocketID(), SOCKET_IO_SSL_CONNECT_FAILED);
return nErrorCode;
}

int32_t nRet = SSL_connect(GetSSL());
if (nRet == 1)
Expand Down Expand Up @@ -252,13 +267,14 @@ int32_t CSSLClientAsync::SendMsgAsync(const char *szBuf, int32_t nBufSize)
pBufferLoop->create_buffer(nBufSize);
pBufferLoop->append_buffer(szBuf, nBufSize);
m_sendqueue.push(pBufferLoop);
//m_pio->Add_WriteEvent(this);
}
}
m_sendqueuemutex.Unlock();
return SOCKET_IO_RESULT_OK;
}
m_sendqueuemutex.Unlock();

int32_t nRet = SSL_write(GetSSL(), (void*)szBuf, nBufSize);
if ( nRet < 0)
{
Expand All @@ -273,7 +289,7 @@ int32_t CSSLClientAsync::SendMsgAsync(const char *szBuf, int32_t nBufSize)
m_sendqueuemutex.Unlock();
//有数据放入待发送队列,则注册为写事件
m_pio->Add_WriteEvent(this);
SOCKET_IO_DEBUG("send ssl data, buffer is blocking.");
SOCKET_IO_DEBUG("send ssl data, buffer is blocking, errno: %d.", nError);
}
else
{
Expand Down Expand Up @@ -321,6 +337,7 @@ int32_t CSSLClientAsync::SendBufferAsync()
m_sendqueuemutex.Lock();
if (m_sendqueue.size() == 0)
{
SOCKET_IO_DEBUG("ssl send queue is empty.");
//待发送队列中为空,则删除写事件的注册,改成读事件
m_pio->Remove_WriteEvent(this);
m_sendqueuemutex.Unlock();
Expand All @@ -342,7 +359,7 @@ int32_t CSSLClientAsync::SendBufferAsync()
int32_t nError = SSL_get_error(GetSSL(), nRet);
if (SSL_ERROR_WANT_WRITE == nError || SSL_ERROR_WANT_READ == nError)
{
SOCKET_IO_DEBUG("send ssl data, buffer is blocking.");
SOCKET_IO_DEBUG("send ssl data, buffer is blocking, errno: %d.", nError);
}
else
{
Expand Down
1 change: 1 addition & 0 deletions server/src/push_server/socket/tcp_client_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ int32_t CTCPClientAsync::SendMsgAsync(const char* szBuf, int32_t nBufSize )
pBufferLoop->create_buffer(nBufSize);
pBufferLoop->append_buffer(szBuf, nBufSize);
m_sendqueue.push(pBufferLoop);
//m_pio->Add_WriteEvent(this);
}
}
m_sendqueuemutex.Unlock();
Expand Down
1 change: 1 addition & 0 deletions server/src/push_server/socket/tcp_session_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ int32_t CTCPSessionAsync::SendMsgAsync( const char* szBuf, int32_t nBufSize )
pBufferLoop->create_buffer(nBufSize);
pBufferLoop->append_buffer(szBuf, nBufSize);
m_sendqueue.push(pBufferLoop);
//m_pio->Add_WriteEvent(this);
}
}
m_sendqueuemutex.Unlock();
Expand Down

0 comments on commit 21042e9

Please sign in to comment.