Skip to content

Commit

Permalink
Merge pull request joyieldInc#40 from joyieldInc/issue32
Browse files Browse the repository at this point in the history
Issue32
  • Loading branch information
fortrue authored Jul 5, 2018
2 parents 28edb79 + 20bfcb6 commit 36152f5
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 39 deletions.
3 changes: 2 additions & 1 deletion src/AcceptConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class AcceptConnection :
typedef AcceptConnection Value;
typedef ListNode<AcceptConnection, SharePtr<AcceptConnection>> ListNodeType;
typedef DequeNode<AcceptConnection, SharePtr<AcceptConnection>> DequeNodeType;
typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> Allocator;
public:
AcceptConnection(int fd, sockaddr* addr, socklen_t len);
~AcceptConnection();
Expand Down Expand Up @@ -97,6 +98,6 @@ class AcceptConnection :

typedef List<AcceptConnection> AcceptConnectionList;
typedef Deque<AcceptConnection> AcceptConnectionDeque;
typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> AcceptConnectionAlloc;
typedef AcceptConnection::Allocator AcceptConnectionAlloc;

#endif
4 changes: 2 additions & 2 deletions src/Alloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class Alloc : public AllocBase
}
UsedMemory += allocSize<T>();
if (MaxMemory == 0 || UsedMemory <= MaxMemory) {
void* p = ::operator new(allocSize<T>());
void* p = ::operator new(allocSize<T>(), std::nothrow);
if (p) {
try {
obj = new (p) T(args...);
Expand Down Expand Up @@ -145,7 +145,7 @@ class RefCntObj
{
int n = --mCnt;
if (n == 0) {
Alloc<T>::destroy(static_cast<T*>(this));
T::Allocator::destroy(static_cast<T*>(this));
} else if (n < 0) {
logError("unref object %p with cnt %d", this, n);
abort();
Expand Down
4 changes: 3 additions & 1 deletion src/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Buffer :
public RefCntObj<Buffer>
{
public:
typedef Alloc<Buffer, Const::BufferAllocCacheSize> Allocator;
static const int MaxBufFmtAppendLen = 8192;
public:
Buffer& operator=(const Buffer&);
Expand Down Expand Up @@ -92,12 +93,13 @@ class Buffer :
};

typedef List<Buffer> BufferList;
typedef Buffer::Allocator BufferAlloc;

template<>
inline int allocSize<Buffer>()
{
return Buffer::getSize() + sizeof(Buffer);
}
typedef Alloc<Buffer, Const::BufferAllocCacheSize> BufferAlloc;

struct BufferPos
{
Expand Down
4 changes: 2 additions & 2 deletions src/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ namespace Const
static const int MaxCmdLen = 32;
static const int MaxKeyLen = 512;
static const int BufferAllocCacheSize = 64;
static const int RequestAllocCacheSize = 32;
static const int ResponseAllocCacheSize = 32;
static const int RequestAllocCacheSize = 128;
static const int ResponseAllocCacheSize = 128;
static const int AcceptConnectionAllocCacheSize = 32;
static const int ConnectConnectionAllocCacheSize = 4;
};
Expand Down
3 changes: 2 additions & 1 deletion src/ConnectConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class ConnectConnection :
typedef ConnectConnection Value;
typedef ListNode<ConnectConnection> ListNodeType;
typedef DequeNode<ConnectConnection> DequeNodeType;
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> Allocator;
public:
ConnectConnection(Server* s, bool shared);
~ConnectConnection();
Expand Down Expand Up @@ -97,6 +98,6 @@ class ConnectConnection :

typedef List<ConnectConnection> ConnectConnectionList;
typedef Deque<ConnectConnection> ConnectConnectionDeque;
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> ConnectConnectionAlloc;
typedef ConnectConnection::Allocator ConnectConnectionAlloc;

#endif
2 changes: 1 addition & 1 deletion src/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ BufferPtr Connection::getBuffer(Handler* h, bool allowNew)
}
}
if (!mBuf || mBuf->full()) {
BufferPtr buf = Alloc<Buffer>::create();
BufferPtr buf = BufferAlloc::create();
if (mBuf) {
mBuf->concat(buf);
}
Expand Down
6 changes: 1 addition & 5 deletions src/EpollMultiplexor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@ bool EpollMultiplexor::addSocket(Socket* s, int evts)
event.events |= (evts & ReadEvent) ? EPOLLIN : 0;
event.events |= (evts & WriteEvent) ? EPOLLOUT : 0;
event.events |= EPOLLET;
//event.events |= EPOLLONESHOT;
event.data.ptr = s;
s->setEvent(evts);
int ret = epoll_ctl(mFd, EPOLL_CTL_ADD, s->fd(), &event);
if (ret == 0) {
s->setEvent(evts);
}
return ret == 0;
}

Expand All @@ -61,7 +58,6 @@ bool EpollMultiplexor::addEvent(Socket* s, int evts)
}
if ((s->getEvent() | evts) != s->getEvent()) {
event.events |= EPOLLET;
//event.events |= EPOLLONESHOT;
int ret = epoll_ctl(mFd, EPOLL_CTL_MOD, s->fd(), &event);
if (ret == 0) {
s->setEvent(s->getEvent() | evts);
Expand Down
3 changes: 2 additions & 1 deletion src/Handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len)
AcceptConnection* c = nullptr;
try {
c = AcceptConnectionAlloc::create(fd, addr, len);
logNotice("h %d accept c %s %d", id(), c->peer(), fd);
} catch (ExceptionBase& e) {
logWarn("h %d create connection for client %d fail %s",
id(), fd, e.what());
Expand Down Expand Up @@ -368,6 +367,8 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len)
logWarn("h %d destroy c %s %d with add to event loop fail:%s",
id(), c->peer(), c->fd(), StrError());
AcceptConnectionAlloc::destroy(c);
} else {
logNotice("h %d accept c %s %d assign to h %d", id(), c->peer(), fd, dst->id());
}
}

Expand Down
1 change: 0 additions & 1 deletion src/KqueueMultiplexor.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,5 @@ int KqueueMultiplexor::wait(long usec, T* handler)


typedef KqueueMultiplexor Multiplexor;
#define _MULTIPLEXOR_ASYNC_ASSIGN_

#endif
38 changes: 20 additions & 18 deletions src/Request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Request::Request(GenericCode code):

Request::~Request()
{
clear();
}

void Request::clear()
Expand Down Expand Up @@ -155,13 +156,14 @@ void Request::set(const RequestParser& p, Request* leader)
}
mHead = r->mReq;
mReq = p.request();
mLeader = leader;
if (leader == this) {
if (mType == Command::Mset || mType == Command::Msetnx) {
mFollowers = (p.argNum() - 1) >> 1;
} else {
mFollowers = p.argNum() - 1;
}
} else {
mLeader = leader;
}
} else {
mReq = p.request();
Expand Down Expand Up @@ -287,7 +289,7 @@ void Request::adjustScanCursor(long cursor)

void Request::follow(Request* leader)
{
++mFollowers;
leader->mFollowers += 1;
if (leader == this) {
return;
}
Expand Down Expand Up @@ -338,49 +340,49 @@ int Request::fill(IOVec* vecs, int len)
void Request::setResponse(Response* res)
{
mDone = true;
if (mLeader) {
mLeader->mFollowersDone += 1;
if (Request* ld = leader()) {
ld->mFollowersDone += 1;
switch (mType) {
case Command::Mget:
mRes = res;
break;
case Command::Mset:
if (Response* leaderRes = mLeader->getResponse()) {
if (Response* leaderRes = ld->getResponse()) {
if (res->isError() && !leaderRes->isError()) {
mLeader->mRes = res;
ld->mRes = res;
}
} else {
mLeader->mRes = res;
ld->mRes = res;
}
break;
case Command::Msetnx:
if (Response* leaderRes = mLeader->getResponse()) {
if (Response* leaderRes = ld->getResponse()) {
if (!leaderRes->isError() &&
(res->isError() || res->integer() == 0)) {
mLeader->mRes = res;
ld->mRes = res;
}
} else {
mLeader->mRes = res;
ld->mRes = res;
}
break;
case Command::Touch:
case Command::Exists:
case Command::Del:
case Command::Unlink:
if (!mLeader->mRes) {
mLeader->mRes = res;
if (!ld->mRes) {
ld->mRes = res;
}
if (mLeader->isDone()) {
mLeader->mRes->set(mLeader->mRes->integer());
if (ld->isDone()) {
ld->mRes->set(ld->mRes->integer());
}
break;
case Command::ScriptLoad:
if (Response* leaderRes = mLeader->getResponse()) {
if (Response* leaderRes = ld->getResponse()) {
if (leaderRes->isString() && !res->isString()) {
mLeader->mRes = res;
ld->mRes = res;
}
} else {
mLeader->mRes = res;
ld->mRes = res;
}
break;
default:
Expand All @@ -395,7 +397,7 @@ void Request::setResponse(Response* res)

bool Request::isDone() const
{
if (mLeader == this) {
if (isLeader()) {
switch (mType) {
case Command::Mget:
case Command::Psubscribe:
Expand Down
9 changes: 5 additions & 4 deletions src/Request.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Request :
public:
typedef Request Value;
typedef ListNode<Request, SharePtr<Request>, RequestListIndex::Size> ListNodeType;
typedef Alloc<Request, Const::RequestAllocCacheSize> Allocator;
static const int MaxRedirectLimit = 3;
enum GenericCode
{
Expand Down Expand Up @@ -70,7 +71,7 @@ class Request :
bool isDone() const;
AcceptConnection* connection() const
{
return mConn;
return mLeader ? mLeader->mConn : mConn;
}
void detach()
{
Expand Down Expand Up @@ -119,11 +120,11 @@ class Request :
}
Request* leader() const
{
return mLeader;
return isLeader() ? const_cast<Request*>(this) : (Request*)mLeader;
}
bool isLeader() const
{
return mLeader == this;
return mFollowers > 0;
}
bool isDelivered() const
{
Expand Down Expand Up @@ -181,6 +182,6 @@ class Request :

typedef List<Request, RequestListIndex::Recv> RecvRequestList;
typedef List<Request, RequestListIndex::Send> SendRequestList;
typedef Alloc<Request, Const::RequestAllocCacheSize> RequestAlloc;
typedef Request::Allocator RequestAlloc;

#endif
3 changes: 2 additions & 1 deletion src/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Response :
public:
typedef Response Value;
typedef ListNode<Response, SharePtr<Response>> ListNodeType;
typedef Alloc<Response, Const::ResponseAllocCacheSize> Allocator;
enum GenericCode
{
Pong,
Expand Down Expand Up @@ -137,6 +138,6 @@ class Response :
};

typedef List<Response> ResponseList;
typedef Alloc<Response, Const::ResponseAllocCacheSize> ResponseAlloc;
typedef Response::Allocator ResponseAlloc;

#endif
5 changes: 4 additions & 1 deletion src/ServerGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const
continue;
}
DC* dc = s->dc();
if (!dc) {
continue;
}
int dcrp = localDC->getReadPriority(dc);
if (dcrp <= 0) {
continue;
Expand Down Expand Up @@ -221,7 +224,7 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const
dc = sdc[0];
found = true;
}
if (!found) {//dc maybe nullptr even we found
if (!found) {
return nullptr;
}
Server* deadServs[Const::MaxServInGroup];
Expand Down

0 comments on commit 36152f5

Please sign in to comment.