Skip to content

Commit

Permalink
psubscribe support
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyu committed Sep 15, 2023
1 parent 10deaf6 commit a91663c
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 8 deletions.
5 changes: 3 additions & 2 deletions src/ConnectConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ void ConnectConnection::handleResponse(Handler* h)
FuncCallTimer();

int prefixLen = 0;
bool pattern = false;
if (mAcceptConnection) {
const Auth* a = mAcceptConnection->auth();
if ( a && !a->namePrefix().empty()) {
Expand All @@ -173,7 +174,7 @@ void ConnectConnection::handleResponse(Handler* h)

if (mAcceptConnection->inSub(true)) {
int chs;
switch (SubscribeParser::parse(mParser.response(), chs, prefixLen)) {
switch (SubscribeParser::parse(mParser.response(), chs, prefixLen, pattern)) {
case SubscribeParser::Subscribe:
case SubscribeParser::Psubscribe:
mAcceptConnection->decrPendSub();
Expand Down Expand Up @@ -217,7 +218,7 @@ void ConnectConnection::handleResponse(Handler* h)
res->set(mParser);

if (prefixLen) {
res->mutate(prefixLen);
res->mutate(prefixLen, pattern);
}
mParser.reset();
logDebug("h %d s %s %d create res %ld match req %ld",
Expand Down
55 changes: 52 additions & 3 deletions src/Response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void Response::set(const ResponseParser& p)
mNamePrefix = p.namePrefix();
}

void Response::mutate(int subPrefixLen)
void Response::mutate(int subPrefixLen, bool pattern)
{
if ( !mNamePrefix.empty() && mRes.length() > subPrefixLen ){
mHead = mRes;
Expand All @@ -97,8 +97,29 @@ void Response::mutate(int subPrefixLen)
}

mRes.use(1 + mNamePrefix.length() + 1); // skip \n + key prefix + ':'
int newKeyLen = oldKeyLen - mNamePrefix.length() - 1;
mPreRes.fset(nullptr, "$%d\r\n", newKeyLen);

if (pattern){
mKeyPattern = mRes;
Segment keyPatternEnd = mRes;
keyPatternEnd.use(newKeyLen + 2);
mKeyPattern.end() = keyPatternEnd.cur();

mRes.use(newKeyLen + 3); // skip ket pattern + "$"

int keylen = 0;
ch = '0';
while ( ch != '\r' ){
keylen = (keylen*10) + ch - '0';
ch = mRes.cur().buf->data()[mRes.cur().pos];
mRes.use(1);
}

mRes.use(1 + mNamePrefix.length() + 1); // skip \n + key prefix + ':'
mKeyLen.fset(nullptr, "$%d\r\n", keylen - mNamePrefix.length() - 1);
}

mPreRes.fset(nullptr, "$%d\r\n", oldKeyLen - mNamePrefix.length() - 1);

}
}
Expand Down Expand Up @@ -186,6 +207,22 @@ bool Response::send(Socket* s)
return false;
}
}
while (mKeyPattern.get(dat, len)) {
int n = s->write(dat, len);
if (n > 0) {
mKeyPattern.use(n);
} else {
return false;
}
}
while (mKeyLen.get(dat, len)) {
int n = s->write(dat, len);
if (n > 0) {
mKeyLen.use(n);
} else {
return false;
}
}
while (mRes.get(dat, len)) {
int n = s->write(dat, len);
if (n > 0) {
Expand All @@ -205,13 +242,25 @@ int Response::fill(IOVec* vecs, int len, Request* req) const
return n;
}

if ( !mPreRes.empty()){
if ( !mPreRes.empty() ){
n += mPreRes.fill(vecs + n, len - n, all);
if (!all) {
return n;
}
}

if ( !mKeyPattern.empty() ){
n += mKeyPattern.fill(vecs + n, len - n, all);
if (!all) {
return n;
}

n += mKeyLen.fill(vecs + n, len - n, all);
if (!all) {
return n;
}
}

n += mRes.fill(vecs + n, len - n, all);
if (n > 0 && all) {
vecs[n - 1].req = req;
Expand Down
4 changes: 3 additions & 1 deletion src/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Response :
void adjustForLeader(Request* req);
bool send(Socket* s);
int fill(IOVec* vecs, int len, Request* req) const;
void mutate(int subPrefixLen);
void mutate(int subPrefixLen, bool pattern);
void setType(Reply::Type t)
{
mType = t;
Expand Down Expand Up @@ -138,6 +138,8 @@ class Response :
Segment mRes;
String mNamePrefix;
Segment mPreRes;
Segment mKeyPattern;
Segment mKeyLen;
};

typedef List<Response> ResponseList;
Expand Down
3 changes: 2 additions & 1 deletion src/Subscribe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Subscribe::Subscribe():



SubscribeParser::Status SubscribeParser::parse(const Segment& body, int& chs, int& prefixLen)
SubscribeParser::Status SubscribeParser::parse(const Segment& body, int& chs, int& prefixLen, bool& pattern)
{
SegmentStr<128> str(body);
Status st = Unknown;
Expand All @@ -25,6 +25,7 @@ SubscribeParser::Status SubscribeParser::parse(const Segment& body, int& chs, in
st = Message;
} else if (str.hasPrefix("*4\r\n$8\r\npmessage\r\n")) {
prefixLen = strlen("*4\r\n$8\r\npmessage\r\n");
pattern = true;
st = Pmessage;
} else if (str.hasPrefix("*3\r\n$9\r\nsubscribe\r\n")) {
prefixLen = strlen("*3\r\n$9\r\nsubscribe\r\n");
Expand Down
2 changes: 1 addition & 1 deletion src/Subscribe.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class SubscribeParser
String
};
public:
static Status parse(const Segment& body, int& chs, int& prefixLen);
static Status parse(const Segment& body, int& chs, int& prefixLen, bool& pattern);
};


Expand Down

0 comments on commit a91663c

Please sign in to comment.