Skip to content

Commit

Permalink
1、优化无锁队列的使用
Browse files Browse the repository at this point in the history
  • Loading branch information
luotan-123 committed Aug 15, 2020
1 parent d1daffd commit 79cb11b
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 52 deletions.
23 changes: 14 additions & 9 deletions linuxserverplatform/src/Kernel/GServerConnect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "RRlockQueue.h"
#include "GServerConnect.h"

//////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -372,7 +373,7 @@ bool CGServerConnect::Start(CDataLine* pDataLine, int serverID, int serverType,
if (bStartSendThread)
{
//创建发送队列
m_pSendDataLine = new CDataLine();
m_pSendDataLine = new RRlockQueue();

err = pthread_create(&m_hThreadSendMsg, NULL, ThreadSendMsg, (void*)this);
if (err != 0)
Expand Down Expand Up @@ -728,10 +729,11 @@ void* CGServerConnect::ThreadSendMsg(void* pThreadData)
pthread_exit(NULL);
}

CDataLine* pDataLine = pThis->m_pSendDataLine;
RRlockQueue* pDataLine = pThis->m_pSendDataLine;

//数据缓存
DataLineHead* pDataLineHead = NULL;
//DataLineHead* pDataLineHead = NULL;
DataLineHead* pDataLineHead = (DataLineHead*)malloc(MAX_SINGLE_UNLOCKQUEUE_SIZE);

sleep(3);

Expand All @@ -743,7 +745,7 @@ void* CGServerConnect::ThreadSendMsg(void* pThreadData)
{
//获取数据
unsigned int bytes = pDataLine->GetData(&pDataLineHead);
if (bytes == 0 || pDataLineHead == NULL)
if (bytes == 0)
{
continue;
}
Expand All @@ -767,11 +769,11 @@ void* CGServerConnect::ThreadSendMsg(void* pThreadData)
}
}

// 释放内存
if (pDataLineHead)
{
free(pDataLineHead);
}
//// 释放内存
//if (pDataLineHead)
//{
// free(pDataLineHead);
//}
}
catch (...)
{
Expand All @@ -781,5 +783,8 @@ void* CGServerConnect::ThreadSendMsg(void* pThreadData)

INFO_LOG("CGServerConnect::ThreadSendMsg exit.");

// 释放内存
free(pDataLineHead);

pthread_exit(NULL);
}
11 changes: 8 additions & 3 deletions linuxserverplatform/src/Kernel/RRlockQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
#include <assert.h>


RRlockQueue::RRlockQueue()
RRlockQueue::RRlockQueue(bool bAutoLock, QueueType qType, unsigned int nTimeOnce)
{
m_pUnLockQueue = nullptr;
m_pUnLockQueue = new UnlockQueue(MAX_UNLOCKQUEUE_LEN, QUEUE_TYPE_SLEEP);
m_pUnLockQueue = new UnlockQueue(MAX_UNLOCKQUEUE_LEN, qType, nTimeOnce);

assert(m_pUnLockQueue != nullptr);
assert(m_pUnLockQueue->Initialize());

m_bAutoLock = bAutoLock;
}

RRlockQueue::~RRlockQueue()
Expand Down Expand Up @@ -59,7 +61,10 @@ UINT RRlockQueue::AddData(DataLineHead* pDataInfo, UINT uAddSize, UINT uDataKind
}

// 加锁
CSignedLockObject LockObject(&m_csLock);
if (m_bAutoLock)
{
CSignedLockObject LockObject(&m_csLock);
}

// 拷贝数据
if (m_pUnLockQueue->Put((const unsigned char*)pDataInfo, uAddSize, (const unsigned char*)pAppendData, uAppendAddSize) == 0)
Expand Down
29 changes: 19 additions & 10 deletions linuxserverplatform/src/Kernel/TCPSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "log.h"
#include "configManage.h"
#include "DataLine.h"
#include "RRlockQueue.h"
#include "Xor.h"
#include <sys/socket.h>
#include <netinet/in.h>
Expand Down Expand Up @@ -113,7 +114,7 @@ bool CTCPSocketManage::Start(int serverType)
// 创建发送队列
if (m_pSendDataLine == NULL)
{
m_pSendDataLine = new CDataLine;
m_pSendDataLine = new RRlockQueue();
}

// 创建发送线程
Expand Down Expand Up @@ -156,6 +157,10 @@ bool CTCPSocketManage::Stop()
m_iFreeTail = 0;
m_socketIndexVec.clear();

//释放内存
m_pSendDataLine->CleanLineData();
SAFE_DELETE(m_pSendDataLine);

event_base_loopbreak(m_listenerBase);
for (size_t i = 0; i < m_workBaseVec.size(); i++)
{
Expand Down Expand Up @@ -297,7 +302,7 @@ CDataLine* CTCPSocketManage::GetRecvDataLine()
return NULL;
}

CDataLine* CTCPSocketManage::GetSendDataLine()
RRlockQueue* CTCPSocketManage::GetSendDataLine()
{
return m_pSendDataLine;
}
Expand Down Expand Up @@ -877,10 +882,11 @@ void* CTCPSocketManage::ThreadSendMsg(void* pThreadData)
pthread_exit(NULL);
}

CDataLine* pDataLine = pThis->GetSendDataLine();
RRlockQueue* pDataLine = pThis->GetSendDataLine();

//数据缓存
DataLineHead* pDataLineHead = NULL;
//DataLineHead* pDataLineHead = NULL;
DataLineHead* pDataLineHead = (DataLineHead*)malloc(MAX_SINGLE_UNLOCKQUEUE_SIZE);

sleep(3);

Expand All @@ -890,7 +896,7 @@ void* CTCPSocketManage::ThreadSendMsg(void* pThreadData)
{
//获取数据
unsigned int bytes = pDataLine->GetData(&pDataLineHead);
if (bytes == 0 || pDataLineHead == NULL)
if (bytes == 0)
{
continue;
}
Expand Down Expand Up @@ -926,15 +932,18 @@ void* CTCPSocketManage::ThreadSendMsg(void* pThreadData)
ERROR_LOG("发送数据失败,index=%d 超出范围", index);
}

// 释放内存
if (pDataLineHead)
{
free(pDataLineHead);
}
//// 释放内存
//if (pDataLineHead)
//{
// free(pDataLineHead);
//}
}

INFO_LOG("CTCPSocketManage::ThreadSendMsg exit.");

// 释放内存
free(pDataLineHead);

pthread_exit(NULL);
}

Expand Down
11 changes: 8 additions & 3 deletions linuxserverplatform/src/Kernel/UnlockQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
#include <unistd.h>
#include "UnlockQueue.h"

#define QUEUE_TIME_ONCE 4000000 // 单位:纳秒

UnlockQueue::UnlockQueue(unsigned int nSize, QueueType qType)
UnlockQueue::UnlockQueue(unsigned int nSize, QueueType qType, unsigned int nTimeOnce)
: m_pBuffer(NULL)
, m_nSize(nSize)
, m_nIn(0)
, m_nOut(0)
, m_qType(qType)
, m_nTimeOnce(nTimeOnce)
{
//round up to the next power of 2
if (!is_power_of_2(nSize))
Expand All @@ -34,6 +34,11 @@ UnlockQueue::UnlockQueue(unsigned int nSize, QueueType qType)
// 初始化条件变量
pthread_cond_init(&m_cond, NULL);
}

if (qType == QUEUE_TYPE_SLEEP && m_nTimeOnce >= 1000000000)
{
m_nTimeOnce = QUEUE_TIME_ONCE;
}
}

UnlockQueue::~UnlockQueue()
Expand Down Expand Up @@ -202,7 +207,7 @@ unsigned int UnlockQueue::Get(unsigned char* buffer, unsigned int len)
{
struct timespec slptm;
slptm.tv_sec = 0;
slptm.tv_nsec = QUEUE_TIME_ONCE; //1000 ns = 1 us
slptm.tv_nsec = m_nTimeOnce;
nanosleep(&slptm, NULL);
}
return 0;
Expand Down
29 changes: 19 additions & 10 deletions linuxserverplatform/src/Kernel/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "log.h"
#include "configManage.h"
#include "DataLine.h"
#include "RRlockQueue.h"
#include "Xor.h"
#include <sys/socket.h>
#include <netinet/in.h>
Expand Down Expand Up @@ -116,7 +117,7 @@ bool CWebSocketManage::Start(int serverType)
// 创建发送队列
if (m_pSendDataLine == NULL)
{
m_pSendDataLine = new CDataLine;
m_pSendDataLine = new RRlockQueue();
}

// 创建发送线程
Expand Down Expand Up @@ -159,6 +160,10 @@ bool CWebSocketManage::Stop()
m_iFreeTail = 0;
m_socketIndexVec.clear();

//释放内存
m_pSendDataLine->CleanLineData();
SAFE_DELETE(m_pSendDataLine);

event_base_loopbreak(m_listenerBase);
for (size_t i = 0; i < m_workBaseVec.size(); i++)
{
Expand Down Expand Up @@ -296,7 +301,7 @@ CDataLine* CWebSocketManage::GetRecvDataLine()
return NULL;
}

CDataLine* CWebSocketManage::GetSendDataLine()
RRlockQueue* CWebSocketManage::GetSendDataLine()
{
return m_pSendDataLine;
}
Expand Down Expand Up @@ -1056,10 +1061,11 @@ void* CWebSocketManage::ThreadSendMsg(void* pThreadData)
pthread_exit(NULL);
}

CDataLine* pDataLine = pThis->GetSendDataLine();
RRlockQueue* pDataLine = pThis->GetSendDataLine();

//数据缓存
DataLineHead* pDataLineHead = NULL;
//DataLineHead* pDataLineHead = NULL;
DataLineHead* pDataLineHead = (DataLineHead*)malloc(MAX_SINGLE_UNLOCKQUEUE_SIZE);

sleep(3);

Expand All @@ -1069,7 +1075,7 @@ void* CWebSocketManage::ThreadSendMsg(void* pThreadData)
{
//获取数据
unsigned int bytes = pDataLine->GetData(&pDataLineHead);
if (bytes == 0 || pDataLineHead == NULL)
if (bytes == 0)
{
continue;
}
Expand Down Expand Up @@ -1105,15 +1111,18 @@ void* CWebSocketManage::ThreadSendMsg(void* pThreadData)
ERROR_LOG("发送数据失败,index=%d 超出范围", index);
}

// 释放内存
if (pDataLineHead)
{
free(pDataLineHead);
}
//// 释放内存
//if (pDataLineHead)
//{
// free(pDataLineHead);
//}
}

INFO_LOG("CWebSocketManage::ThreadSendMsg exit.");

// 释放内存
free(pDataLineHead);

pthread_exit(NULL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
class CBaseMainManage : public IAsynThreadResultService
{
protected:
volatile bool m_bRun; //运行标志
bool m_bRun; //运行标志
bool m_bInit; //初始化标志
HANDLE m_hHandleThread; //处理线程
HANDLE m_connectCServerHandle; //与中心服交互线程句柄
Expand Down
7 changes: 4 additions & 3 deletions linuxserverplatform/src/include/ServerCommon/GServerConnect.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const int GSERVER_SOCKET_RECV_BUF = 512 * 1024; // 应用层接收缓冲区大
#define MAX_RECONNECT_COUNT 4 // 最多重连次数,0:无限重连

class CDataLine;
class RRlockQueue;
class CGServerConnect;
class CSignedLock;

Expand Down Expand Up @@ -43,7 +44,7 @@ class CGServerClient
private:
int m_socket;
volatile bool m_isConnected;
volatile int m_ReConnectCount;
int m_ReConnectCount;

char m_recvBuf[GSERVER_SOCKET_RECV_BUF];
int m_remainRecvBytes;
Expand Down Expand Up @@ -94,10 +95,10 @@ class CGServerConnect
std::vector<CGServerClient*> m_socketVec;
std::map<pthread_t, int> m_threadIDToIndexMap;
CDataLine* m_pRecvDataLine;
CDataLine* m_pSendDataLine;
RRlockQueue* m_pSendDataLine;
int m_serverID;
int m_serverType;
volatile bool m_running;
bool m_running;
pthread_t m_hThreadCheckConnect;
pthread_t m_hThreadSendMsg;
};
5 changes: 3 additions & 2 deletions linuxserverplatform/src/include/ServerCommon/RRlockQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
// 消息队列最大字节数量
#define MAX_UNLOCKQUEUE_LEN (8 * 1024 * 1024)
// 消息队列最大单包长度
#define MAX_SINGLE_UNLOCKQUEUE_SIZE (128 * 1024)
#define MAX_SINGLE_UNLOCKQUEUE_SIZE (256 * 1024)

class RRlockQueue
{

public:
RRlockQueue();
RRlockQueue(bool bAutoLock = true, QueueType qType = QUEUE_TYPE_SLEEP, unsigned int nTimeOnce = QUEUE_TIME_ONCE);
virtual ~RRlockQueue();

public:
Expand All @@ -36,4 +36,5 @@ class RRlockQueue
private:
CSignedLock m_csLock;
UnlockQueue* m_pUnLockQueue;
bool m_bAutoLock;
};
2 changes: 1 addition & 1 deletion linuxserverplatform/src/include/ServerCommon/ServerTimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class CServerTimer
static void* ThreadCheckTimer(void* pThreadData);

private:
volatile bool m_bRun;
bool m_bRun;
std::unordered_map<unsigned int, ServerTimerInfo> m_timerMap;
CDataLine* m_pDataLine; // 共享的dataline对象
CSignedLock* m_pLock; // 线程锁
Expand Down
Loading

0 comments on commit 79cb11b

Please sign in to comment.