Skip to content

Commit

Permalink
Support hyperloglog (OpenAtomFoundation#56)
Browse files Browse the repository at this point in the history
* Support hyperloglog

* add header file

* format code

* change the default number of parameters in pfcount

* redis protocol compatible

* redis protocol compatible and format code
  • Loading branch information
Leviathan authored and KernelMaker committed Feb 7, 2017
1 parent 040dc47 commit 6042fd0
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 20 deletions.
12 changes: 9 additions & 3 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,14 @@ const std::string kCmdNameSInter = "sinter";
const std::string kCmdNameSInterstore = "sinterstore";
const std::string kCmdNameSIsmember = "sismember";
const std::string kCmdNameSDiff = "sdiff";
const std::string kCmdNameSDiffstore= "sdiffstore";
const std::string kCmdNameSMove= "smove";
const std::string kCmdNameSRandmember= "srandmember";
const std::string kCmdNameSDiffstore = "sdiffstore";
const std::string kCmdNameSMove = "smove";
const std::string kCmdNameSRandmember = "srandmember";

//HyperLogLog
const std::string kCmdNamePfAdd = "pfadd";
const std::string kCmdNamePfCount = "pfcount";
const std::string kCmdNamePfMerge = "pfmerge";

typedef pink::RedisCmdArgsType PikaCmdArgsType;

Expand All @@ -177,6 +182,7 @@ enum CmdFlags {
kCmdFlagsSet = 8,
kCmdFlagsZset = 10,
kCmdFlagsBit = 12,
kCmdFlagsHyperLogLog = 14,
kCmdFlagsNoLocal = 0, //default nolocal
kCmdFlagsLocal = 16,
kCmdFlagsNoSuspend = 0, //default nosuspend
Expand Down
52 changes: 52 additions & 0 deletions include/pika_hyperloglog.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_HYPERLOGLOG_H_
#define PIKA_HYPERLOGLOG_H_

#include "pika_command.h"
#include "nemo.h"

/*
* hyperloglog
*/
class PfAddCmd : public Cmd {
public:
PfAddCmd() {};
virtual void Do();
private:
std::string key_;
std::vector<std::string> values_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
values_.clear();
}
};

class PfCountCmd : public Cmd {
public:
PfCountCmd() {};
virtual void Do();
private:
std::vector<std::string> keys_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
keys_.clear();
}
};

class PfMergeCmd : public Cmd {
public:
PfMergeCmd() {};
virtual void Do();
private:
std::vector<std::string> keys_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
keys_.clear();
}
};

#endif
59 changes: 42 additions & 17 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "pika_set.h"
#include "pika_zset.h"
#include "pika_bit.h"
#include "pika_hyperloglog.h"

static std::unordered_map<std::string, CmdInfo*> cmd_infos(300); /* Table for CmdInfo */

Expand Down Expand Up @@ -54,6 +55,7 @@ void InitCmdInfoTable() {
CmdInfo* dbsizeptr = new CmdInfo(kCmdNameDbsize, 1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameDbsize, dbsizeptr));


//migrate slot
CmdInfo* slotmgrtslotptr = new CmdInfo(kCmdNameSlotsMgrtSlot, 5, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsMgrtSlot, slotmgrtslotptr));
Expand All @@ -76,6 +78,7 @@ void InitCmdInfoTable() {
CmdInfo* slotsscanptr = new CmdInfo(kCmdNameSlotsScan, -3, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsScan, slotsscanptr));


//Kv
////SetCmd
CmdInfo* setptr = new CmdInfo(kCmdNameSet, -3, kCmdFlagsWrite | kCmdFlagsKv);
Expand Down Expand Up @@ -371,6 +374,17 @@ void InitCmdInfoTable() {
////BitCount
CmdInfo* bitcountptr = new CmdInfo(kCmdNameBitCount, -2, kCmdFlagsRead | kCmdFlagsBit);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameBitCount, bitcountptr)).second;

//HyperLogLog
////PfAdd
CmdInfo* pfaddptr = new CmdInfo(kCmdNamePfAdd, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePfAdd, pfaddptr));
////PfCount
CmdInfo* pfcountptr = new CmdInfo(kCmdNamePfCount, -2, kCmdFlagsRead | kCmdFlagsHyperLogLog);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePfCount, pfcountptr));
////PfMerge
CmdInfo* pfmergeptr = new CmdInfo(kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePfMerge, pfmergeptr));
}

void DestoryCmdInfoTable() {
Expand Down Expand Up @@ -540,49 +554,49 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
//Hash
////HDelCmd
Cmd* hdelptr = new HDelCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHDel, hdelptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHDel, hdelptr));
////HSetCmd
Cmd* hsetptr = new HSetCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHSet, hsetptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHSet, hsetptr));
////HGetCmd
Cmd* hgetptr = new HGetCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHGet, hgetptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHGet, hgetptr));
////HGetallCmd
Cmd* hgetallptr = new HGetallCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHGetall, hgetallptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHGetall, hgetallptr));
////HExistsCmd
Cmd* hexistsptr = new HExistsCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHExists, hexistsptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHExists, hexistsptr));
////HIncrbyCmd
Cmd* hincrbyptr = new HIncrbyCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHIncrby, hincrbyptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHIncrby, hincrbyptr));
////HIncrbyfloatCmd
Cmd* hincrbyfloatptr = new HIncrbyfloatCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHIncrbyfloat, hincrbyfloatptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHIncrbyfloat, hincrbyfloatptr));
////HKeysCmd
Cmd* hkeysptr = new HKeysCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHKeys, hkeysptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHKeys, hkeysptr));
////HLenCmd
Cmd* hlenptr = new HLenCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHLen, hlenptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHLen, hlenptr));
////HMgetCmd
Cmd* hmgetptr = new HMgetCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHMget, hmgetptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHMget, hmgetptr));
////HMsetCmd
Cmd* hmsetptr = new HMsetCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHMset, hmsetptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHMset, hmsetptr));
////HSetnxCmd
Cmd* hsetnxptr = new HSetnxCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHSetnx, hsetnxptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHSetnx, hsetnxptr));
////HStrlenCmd
Cmd* hstrlenptr = new HStrlenCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHStrlen, hstrlenptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHStrlen, hstrlenptr));
////HValsCmd
Cmd* hvalsptr = new HValsCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHVals, hvalsptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHVals, hvalsptr));
////HScanCmd
Cmd* hscanptr = new HScanCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHScan, hscanptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameHScan, hscanptr));
//List
Cmd* lindexptr = new LIndexCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameLIndex, lindexptr));
Expand All @@ -591,7 +605,7 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
Cmd* llenptr = new LLenCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameLLen, llenptr));
Cmd* lpopptr = new LPopCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameLPop, lpopptr));
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameLPop, lpopptr));
Cmd* lpushptr = new LPushCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameLPush, lpushptr));
Cmd* lpushxptr = new LPushxCmd();
Expand Down Expand Up @@ -742,9 +756,20 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
Cmd* bitopptr = new BitOpCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameBitOp, bitopptr));

//HyperLogLog
////pfaddCmd
Cmd * pfaddptr = new PfAddCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePfAdd, pfaddptr));
////pfcountCmd
Cmd * pfcountptr = new PfCountCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePfCount, pfcountptr));
////pfmergeCmd
Cmd * pfmergeptr = new PfMergeCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePfMerge, pfmergeptr));

}

Cmd* GetCmdFromTable(const std::string& opt,
Cmd* GetCmdFromTable(const std::string& opt,
const std::unordered_map<std::string, Cmd*> &cmd_table) {
std::unordered_map<std::string, Cmd*>::const_iterator it = cmd_table.find(opt);
if (it != cmd_table.end()) {
Expand Down
80 changes: 80 additions & 0 deletions src/pika_hyperloglog.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <vector>
#include "slash_string.h"
#include "nemo.h"
#include "pika_server.h"
#include "pika_hyperloglog.h"

extern PikaServer *g_pika_server;

void PfAddCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
if (!ptr_info->CheckArg(argv.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNamePfAdd);
return;
}
key_ = argv[1];
size_t pos = 2;
while (pos < argv.size()) {
values_.push_back(argv[pos++]);
}
}

void PfAddCmd::Do() {
nemo::Status s;
bool update = false;
s = g_pika_server->db()->PfAdd(key_, values_, update);
if (s.ok() && update) {
res_.AppendInteger(1);
} else if (s.ok() && !update) {
res_.AppendInteger(0);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
}

void PfCountCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
if (!ptr_info->CheckArg(argv.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNamePfCount);
return;
}
size_t pos = 1;
while (pos < argv.size()) {
keys_.push_back(argv[pos++]);
}
}

void PfCountCmd::Do() {
nemo::Status s;
int value_;
s = g_pika_server->db()->PfCount(keys_, value_);
if (s.ok()) {
res_.AppendInteger(value_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
}

void PfMergeCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
if (!ptr_info->CheckArg(argv.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNamePfMerge);
return;
}
size_t pos = 1;
while (pos < argv.size()) {
keys_.push_back(argv[pos++]);
}
}

void PfMergeCmd::Do() {
nemo::Status s;
s = g_pika_server->db()->PfMerge(keys_);
if (s.ok()) {
res_.SetRes(CmdRes::kOk);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
}

0 comments on commit 6042fd0

Please sign in to comment.