forked from OpenAtomFoundation/pika
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpika_client_conn.h
160 lines (128 loc) · 4.74 KB
/
pika_client_conn.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef PIKA_CLIENT_CONN_H_
#define PIKA_CLIENT_CONN_H_
#include <bitset>
#include <utility>
#include "include/pika_command.h"
// TODO: stat time costing in write out data to connfd
struct TimeStat {
TimeStat() = default;
void Reset() {
enqueue_ts_ = dequeue_ts_ = 0;
process_done_ts_ = 0;
}
uint64_t start_ts() const {
return enqueue_ts_;
}
uint64_t total_time() const {
return process_done_ts_ > enqueue_ts_ ? process_done_ts_ - enqueue_ts_ : 0;
}
uint64_t queue_time() const {
return dequeue_ts_ > enqueue_ts_ ? dequeue_ts_ - enqueue_ts_ : 0;
}
uint64_t process_time() const {
return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0;
}
uint64_t enqueue_ts_;
uint64_t dequeue_ts_;
uint64_t process_done_ts_;
};
class PikaClientConn : public net::RedisConn {
public:
using WriteCompleteCallback = std::function<void()>;
struct BgTaskArg {
std::shared_ptr<Cmd> cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr;
std::vector<net::RedisCmdArgsType> redis_cmds;
std::shared_ptr<std::string> resp_ptr;
LogOffset offset;
std::string db_name;
uint32_t slot_id;
};
struct TxnStateBitMask {
public:
static constexpr uint8_t Start = 0;
static constexpr uint8_t InitCmdFailed = 1;
static constexpr uint8_t WatchFailed = 2;
static constexpr uint8_t Execing = 3;
};
// Auth related
class AuthStat {
public:
void Init();
bool IsAuthed(const std::shared_ptr<Cmd>& cmd_ptr);
bool ChecknUpdate(const std::string& message);
private:
enum StatType {
kNoAuthed = 0,
kAdminAuthed,
kLimitAuthed,
};
StatType stat_;
};
PikaClientConn(int fd, const std::string& ip_port, net::Thread* server_thread, net::NetMultiplexer* mpx,
const net::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() = default;
void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async,
std::string* response) override;
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);
static void DoExecTask(void* arg);
bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentDb(const std::string& db_name) { current_db_ = db_name; }
const std::string& GetCurrentTable() override { return current_db_; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }
// Txn
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
std::queue<std::shared_ptr<Cmd>> GetTxnCmdQue();
void ClearTxnCmdQue();
bool IsInTxn();
bool IsTxnFailed();
bool IsTxnInitFailed();
bool IsTxnWatchFailed();
bool IsTxnExecing(void);
void SetTxnWatchFailState(bool is_failed);
void SetTxnInitFailState(bool is_failed);
void SetTxnStartState(bool is_start);
void AddKeysToWatch(const std::vector<std::string> &db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string> &db_keys);
void SetAllTxnFailed();
void SetTxnFailedFromDBs(std::string db_name);
void ExitTxn();
net::ServerThread* server_thread() { return server_thread_; }
AuthStat& auth_stat() { return auth_stat_; }
std::atomic<int> resp_num;
std::vector<std::shared_ptr<std::string>> resp_array;
std::shared_ptr<TimeStat> time_stat_;
private:
net::ServerThread* const server_thread_;
std::string current_db_;
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_ = false;
std::queue<std::shared_ptr<Cmd>> txn_cmd_que_;
std::bitset<16> txn_state_;
std::unordered_set<std::string> watched_db_keys_;
std::mutex txn_state_mu_;
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);
void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
void ProcessMonitor(const PikaCmdArgsType& argv);
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr);
void TryWriteResp();
AuthStat auth_stat_;
};
struct ClientInfo {
int fd;
std::string ip_port;
int64_t last_interaction = 0;
std::shared_ptr<PikaClientConn> conn;
};
extern bool AddrCompare(const ClientInfo& lhs, const ClientInfo& rhs);
extern bool IdleCompare(const ClientInfo& lhs, const ClientInfo& rhs);
#endif