Skip to content

Commit

Permalink
Test version
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzzz-dot committed May 30, 2022
1 parent bf6ae66 commit 50a66ff
Show file tree
Hide file tree
Showing 55 changed files with 5,212 additions and 2,311 deletions.
12 changes: 7 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@ cmake_minimum_required(VERSION 3.16)
# project: cpp-gossip, version: 1.0.0
project(cpp-gossip VERSION 1.0.0)

configure_file(cmakeConfig.h.in ${CMAKE_CURRENT_SOURCE_DIR}/include/cmakeConfig.h)

#execute_process(COMMAND protoc -I=${CMAKE_SOURCE_DIR}/include/type/ --cpp_out=${CMAKE_SOURCE_DIR}/src/type/ ${CMAKE_SOURCE_DIR}/include/type/msgtype.proto)
#execute_process(COMMAND mv ${CMAKE_SOURCE_DIR}/src/type/msgtype.pb.h ${CMAKE_SOURCE_DIR}/include/type/msgtype.pb.h)

set(CMAKE_CXX_FLAGS "-lpthread")

set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD 17)

include_directories(include)

# test
enable_testing()

# add directory ./memberlist/type to the cmake build list
add_subdirectory(src)
# add directory ./test to the cmake build list
add_subdirectory(test)
add_subdirectory(temp)

add_executable(main main.cpp)

# temp directory
add_subdirectory(temp)
target_link_libraries(main memberlist)
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
C++ implementation of gossip protocol
This project is based on [memberlist](https://github.com/hashicorp/memberlist) implemented by golang.


## Serialization and Deserialization
This project uses [google protobuf v3.19.4](https://github.com/protocolbuffers/protobuf) to serialize and deserialize the structural data.

[glog v0.6.0](https://github.com/google/glog)
## Time Accuracy
The time accuracy is microsecond level.
5 changes: 5 additions & 0 deletions cmakeConfig.h.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#define PROJECT_VERSION "@PROJECT_VERSION@"
#define VERSION_MAJOR "@PROJECT_VERSION_MAJOR@"
#define VERSION_MINOR "@PROJECT_VERSION_MINOR@"
#define VERSION_PATCH "@PROJECT_VERSION_PATCH@"
#define DEBUG
5 changes: 5 additions & 0 deletions include/cmakeConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#define PROJECT_VERSION "1.0.0"
#define VERSION_MAJOR "1"
#define VERSION_MINOR "0"
#define VERSION_PATCH "0"
#define DEBUG
4 changes: 0 additions & 4 deletions include/memberlist/handlemsg.h

This file was deleted.

226 changes: 126 additions & 100 deletions include/memberlist/memberlist.h
Original file line number Diff line number Diff line change
@@ -1,136 +1,162 @@
#ifndef _MEMBERLIST_H
#define _MEMBERLIST_H
//CURRENT DIR
#include "node.h"
#include "config.h"
//MY INCLUDE DIR
#include <misc/broadcastQueue.hpp>
#include <misc/timer.hpp>
#include <misc/util.hpp>

// MY INCLUDE DIR
#include <cmakeConfig.h>

#include <misc/node.h>
#include <misc/config.h>
#include <misc/suspicion.h>
#include <misc/broadcastQueue.h>
#include <misc/timer.h>
#include <misc/util.h>

#include <type/genmsg.h>
#include <type/msgtype.pb.h>

#include <mynet/broadcast.h>
#include <mynet/net.h>
#include <mynet/wrapped.h>
//SYS INCLUDE PATH

// SYS INCLUDE PATH
#include <glog/logging.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
//#include <string.h>
#include <functional>
#include <atomic>
#include <mutex>
#include <map>
#include <stack>
#include <thread>

using namespace std;

// TCP listen 最大监听的连接数
#define LISTENNUM 256

// epoll 最大监听的事件
#define EPOLLSIZE 1024

//是否打印出收到连接对端的地址信息
#define PRINT_ADDRINFO
#define LISTENNUM 1024

class memberlist
{
friend class timer;

private:
ostream logger;

atomic<uint32_t> sequenceNum; // Local sequence number
uint32_t nextSeqNum(){ sequenceNum.fetch_add(1); return sequenceNum.load(); };
atomic<uint32_t> incarnation; // Local incarnation number
uint32_t nextIncarnation(){ incarnation.fetch_add(1); return incarnation.load(); };
uint32_t skipIncarnation(uint32_t offset){ incarnation.fetch_add(offset); return incarnation.load(); };
atomic<uint32_t> numNodes; // Number of known nodes (estimate)
uint32_t pushPullReq; // Number of push/pull requests

config config; // Config of this member
atomic<bool> leave;
atomic<bool> shutdown;

int tcpfd; // FD of the TCP Socket
int udpfd; // FD of the UDP Socker
int epollfd; // FD of the epoll I/O MUX
struct epoll_event events[EPOLLSIZE]; // Struct for holding epoll_event when epoll_wait returns

mutex nodeMutex;
vector<NodeState *> nodes; // Known nodes
map<string, NodeState *> nodeMap; // Maps Node.Name -> NodeState. It may be deleted in a later sequel
map<string, NodeState *> nodeTimers; // Maps Node.Name -> suspicion timer
vector<NodeState> kRandomNodes(uint8_t k,function<bool(NodeState *n)> exclude);

void pushPullNode(struct sockaddr_in& remote_node,bool join);
MessageData sendAndReceiveState(struct sockaddr_in& remote_node,bool join);
void sendLocalState(int fd,bool join);
void mergeRemoteState(MessageData& pushpull);

void aliveNode(Alive &a,bool bootstrap,int notifyfd);
void refute(NodeState *me, uint32_t accusedInc);
void deadNode(Dead &d);
void suspectNode(Suspect &s);

broadcastQueue TransmitLimitedQueue;
void encodeAndBroadcast(string node,const Compound &cd);
void encodeBroadcastNotify(string node, const Compound &cd,int notifyfd);
string getBroadcasts(size_t overhead, size_t limit)

//when an epoll event happens
void handleevent();
//onReceive a new connection
void handleconn(int connfd);
//onReceive a tcp message
void handletcp(int sockfd);
//onReceive a udp message
void handleudp(int sockfd);

//onReceive a udp ping message
void handlePing(MessageData &ping,sockaddr_in &remote_addr);

//Begin schedule
//do probe, state synchronization and gossip periodically
std::atomic<uint32_t> sequenceNum; // Local sequence number
uint32_t nextSeqNum();
std::atomic<uint32_t> incarnation; // Local incarnation number
uint32_t nextIncarnation();
uint32_t skipIncarnation(uint32_t offset);

std::atomic<uint32_t> numNodes; // Number of known nodes (estimate)
std::atomic<uint32_t> pushPullReq; // Number of push/pull requests

std::shared_ptr<Config> config; // Config of this member
std::atomic<bool> leave;
int leaveFd[2];
std::atomic<bool> shutdown;
int shutdownFd[2];

int tcpFd; // FD of the TCP Socket
int udpFd; // FD of the UDP Socker

int handoffFd[2];
std::mutex msgQueueMutex;
struct msgHandoff
{
Broadcast b;
struct sockaddr_in from;
msgHandoff() = default;
msgHandoff(const Broadcast &b_, const struct sockaddr_in &from_);
};
std::stack<msgHandoff> highPriorityMsgQueue;
std::stack<msgHandoff> lowPriorityMsgQueue;
bool getNextMessage(msgHandoff *msg);

std::mutex nodeMutex;
std::vector<std::shared_ptr<nodeState>> nodes; // Known nodes
std::map<std::string, std::shared_ptr<nodeState>> nodeMap; // Maps Node.Name -> nodeState. It may be deleted in a later sequel
std::map<std::string, std::shared_ptr<suspicion>> nodeTimers; // Maps Node.Name -> suspicion timer

void refute(std::shared_ptr<nodeState> me, uint32_t accusedInc);
void aliveNode(const Broadcast &b, bool bootstrap, int notifyfd);
void suspectNode(const Broadcast &b);
void deadNode(const Broadcast &b);

std::shared_ptr<broadcastQueue> TransmitLimitedQueue;
void onlyBroadcast(std::string node, const Broadcast &bc);
void BroadcastNotify(std::string node, const Broadcast &bc, int notifyFd);
bool getBroadcasts(size_t overhead, size_t limit, ComBroadcast &cbc);

void streamListen();
void packetListen();
void packetHandler();

void handleConn(int connfd);
void handleCommand(MessageData &md, struct sockaddr_in &remote_addr, int64_t ts);

void handlePushPull(const PushPull &pushpull, int connfd);
void handlePing(const Ping &ping, int connfd);
void handleUser(const User &u, int connfd);

void handlePing(const Ping &ping, sockaddr_in &remote_addr);
void handleIndirectPing(const IndirectPing &indirectPing, sockaddr_in &remote_addr);
void handleAck(const AckResp &ack, sockaddr_in &remote_addr, int64_t ts);
void handleNack(const NackResp &nack, sockaddr_in &remote_addr);
void handleUser(const User &b, sockaddr_in &remote_addr);
void handleComBroadcast(const ComBroadcast &comBroadcast, sockaddr_in &remote_addr);

inline void handleAlive(const Broadcast &b, sockaddr_in &remote_addr);
inline void handleDead(const Broadcast &b, sockaddr_in &remote_addr);
inline void handleSuspect(const Broadcast &b, sockaddr_in &remote_addr);

// Begin schedule
// do probe, state synchronization and gossip periodically
std::mutex tickerLock;
bool scheduled;
unique_ptr<timer> schedule_timer[3];
std::unique_ptr<repeatTimer> schedule_timer[3];
void schedule();
void deschedule();

size_t probeIndex;
void probe();
void probenode(NodeState &node);
void setprobepipes(uint32_t seqno,int ackpipe,int nackpipe,uint32_t probeinterval);
void probeNode(nodeState &node);
void setProbePipes(uint32_t seqNo, int ackPipe[2], int nackPipe[2], uint32_t probeInterval);

mutex AckLock;
map<uint32_t,AckHandler> AckHandlers;
std::mutex ackLock;
std::map<uint32_t, std::shared_ptr<ackHandler>> ackHandlers;
void setAckHandler(uint32_t seqNo, std::function<void(int64_t timestamp)> ackFn, int64_t timeout);

void pushPull();
void pushPullNode(const sockaddr_in &remote_addr, bool join);
MessageData sendAndReceiveState(const sockaddr_in &remote_addr, bool join);
void sendLocalState(int fd, bool join);
void mergeRemoteState(const PushPull &pushpull);

void pushpull();
void gossip();

public:
void newmemberlist();
void setAlive();
void newmemberlist(std::shared_ptr<Config> config_);
void clearmemberlist();

memberlist(){
scheduled=false;
sequenceNum.store(0);
};

bool anyAlive();

std::thread t1, t2, t3;

public:
memberlist();
memberlist(std::shared_ptr<Config> config_);
~memberlist();
void join(const string& cluster_addr);

// some helper functino
#ifdef PRINT_ADDRINFO
void printaddr(struct sockaddr_in &remote_addr)
{
char addr[INET_ADDRSTRLEN];
uint16_t port;
if (inet_ntop(remote_addr.sin_family, &remote_addr.sin_addr, addr, sizeof(addr)) <= 0)
{
exit(-1);
}
printf("Receive from %s:%u\n", addr, port);
}
#endif
void Join(const std::string &cluster_addr);
void Leave(int64_t timeout);
void ShutDown();
void UpdateNode(int64_t timeout);

Node *LocalNode();
std::vector<Node *> Members();
size_t NumMembers();
uint32_t EstNumNodes();

void SendBestEffort(std::shared_ptr<Node> to, std::string msg);
void SendReliable(std::shared_ptr<Node> to, std::string msg);
};

#endif
Loading

0 comments on commit 50a66ff

Please sign in to comment.