Skip to content

Commit

Permalink
add more examples
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf committed Mar 21, 2016
1 parent 79ddeb2 commit e4594b1
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ examples/http-echo
examples/log
examples/stat
examples/write-on-empty
examples/http-hello
examples/idle-close
examples/reconnect
examples/safe-close
examples/timer
raw-examples/epoll
raw-examples/epoll-et
raw-examples/kqueue
Expand Down
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ int main(int argc, const char* argv[]) {
###目录结构
* handy--------handy库
* handy--------handy库
* 10m----------进行千万并发连接测试所使用的程序
* examples----示例
* raw-examples--原生api使用示例,包括了epoll,epoll ET模式,kqueue示例
* ssl------------openssl相关的代码与示例
Expand All @@ -71,6 +72,23 @@ int main(int argc, const char* argv[]) {
###[使用文档](https://github.com/yedf/handy/blob/master/doc.md)
###raw-examples
使用os提供的api如epoll,kqueue编写并发应用程序
* epoll.cc,演示了epoll的通常用法,使用epoll的LT模式
* epoll-et.cc,演示了epoll的ET模式,与LT模式非常像,区别主要体现在不需要手动开关EPOLLOUT事件
###examples
使用handy的示例
* chat.cc 简单的聊天应用,用户使用telnet登陆后,系统分配一个用户id,用户可以发送消息给某个用户,也可以发送消息给所有用户
* codec-cli.cc 发送消息给服务器,使用的消息格式为mBdT开始,紧接着4字节的长度,然后是消息内容
* codec-svr.cc 见上
* daemon.cc 程序已以daemon方式启动,从conf文件中获取日志相关的配置,并初始化日志参数
* echo.cc 简单的回显服务
* hsha.cc 半同步半异步示例,用户可以把IO交给handy框架进行处理,自己同步处理用户请求
* http-hello.cc 一个简单的http服务器程序
* stat.cc 一个简单的状态服务器示例,一个内嵌的http服务器,方便外部的工具查看应用程序的状态
* write-on-empty.cc 这个例子演示了需要写出大量数据,例如1G文件这种情景中的使用技巧
license
====
Use of this source code is governed by a BSD-style
Expand Down
35 changes: 22 additions & 13 deletions doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(int argc, const char* argv[]) {
}
```
<h2 id="event-base">EventBase事件分发器</h2>
EventBase是事件分发器,内部使用epoll/kqueue来管理异步IO
EventBase是事件分发器,内部使用epoll/kqueue来管理非阻塞IO
```c
EventBase base;
```
Expand All @@ -42,28 +42,21 @@ base.loop();
```
###退出事件循环
```c
//退出事件循环,可在其他线程中调用
//退出事件循环,线程安全,可在其他线程中调用
base.exit();
```
###是否已退出
```c
bool exited();
```
###管理客户端连接
```c
TcpConnPtr con = TcpConn::createConnection(&base, host, port);
```
###管理tcp服务器的所有socket
```c
TcpServer echo(&base);
```
###在IO线程中执行任务
一些任务必须在IO线程中完成,例如往连接中写入数据。非IO线程需要往连接中写入数据时,必须把任务交由IO线程进行处理
```c
void safeCall(const Task& task);

base.safeCall([](con){con->send("OK");});
base.safeCall([con](){con->send("OK");});
```
[例子程序](examples/safe-close.cc)
###管理定时任务
EventBase通过设定epoll_wait/kevent的等待时间让自己及时返回,然后检查是否有到期的任务,因此时间精度依赖于epoll_wait/kevent的精度
```c
Expand All @@ -77,7 +70,7 @@ bool cancel(TimerId timerid);
TimerId tid = base.runAfter(1000, []{ info("a second passed"); });
base.cancel(tid);
```
[例子程序](examples/timer.cc)
<h2 id="tcp-conn">TcpConn tcp连接</h2>
连接采用引用计数的方式进行管理,因此用户无需手动释放连接
###引用计数
Expand All @@ -88,9 +81,13 @@ typedef std::shared_ptr<TcpConn> TcpConnPtr;
```c
enum State { Invalid=1, Handshaking, Connected, Closed, Failed, };
```
###创建连接
```c
TcpConnPtr con = TcpConn::createConnection(&base, host, port); #第一个参数为前面的EventBase*
```
###使用示例
```c
TcpConnPtr con = TcpConn::createConnection(base, host, port);
TcpConnPtr con = TcpConn::createConnection(&base, host, port);
con->onState([=](const TcpConnPtr& con) {
info("onState called state: %d", con->getState());
});
Expand All @@ -100,18 +97,21 @@ con->onRead([](const TcpConnPtr& con){
con->getInput().clear();
});
```
[例子程序](examples/echo.cc)
###设置重连
```c
//设置重连时间间隔,-1: 不重连,0:立即重连,其它:等待毫秒数,未设置不重连
void setReconnectInterval(int milli);
```
[例子程序](examples/reconnect.cc)
###连接空闲回调
```c
void addIdleCB(int idle, const TcpCallBack& cb);

//连接空闲30s关闭连接
con->addIdleCB(30, [](const TcpConnPtr& con)) { con->close(); });
```
[例子程序](examples/idle-close.cc)
###消息模式
可以使用onRead处理消息,也可以选用onMsg方式处理消息
```c
Expand All @@ -126,6 +126,7 @@ con->onMsg(new LineCodec, [](const TcpConnPtr& con, Slice msg) {
con->sendMsg("hello");
});
```
[例子程序](examples/codec-svr.cc)
###存放自定义数据
```c
template<class T> T& context();
Expand All @@ -134,6 +135,10 @@ con->context<std::string>() = "user defined data";
```

<h2 id="tcp-server">TcpServer tcp服务器</h2>
###创建tcp服务器
```c
TcpServer echo(&base);
```
###使用示例
```c
TcpServer echo(&base); //创建服务器
Expand All @@ -143,6 +148,7 @@ echo.onConnRead([](const TcpConnPtr& con) {
con->send(con->getInput()); // echo 读取的数据
});
```
[例子程序](examples/echo.cc)
###自定义创建的连接
当服务器accept一个连接时,调用此函数
```
Expand All @@ -158,6 +164,7 @@ chat.onConnCreate([&]{
return con;
});
```
[例子程序](examples/codec-svr.cc)

<h2 id="http-server">HttpServer http服务器</h2>
```c
Expand All @@ -171,6 +178,7 @@ sample.onGet("/hello", [](const HttpConnPtr& con) {
con.sendResponse(resp);
});
```
[例子程序](examples/http-hello.cc)
<h2 id="hsha">半同步半异步服务器</h2>
```c
//cb返回空string,表示无需返回数据。如果用户需要更灵活的控制,可以直接操作cb的con参数
Expand All @@ -183,4 +191,5 @@ hsha.onMsg(new LineCodec, [](const TcpConnPtr& con, const string& input){
return util::format("%s used %d ms", input.c_str(), ms);
});
```
[例子程序](examples/hsha.cc)
持续更新中......
File renamed without changes.
21 changes: 21 additions & 0 deletions examples/idle-close.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include <handy/handy.h>
using namespace handy;

int main(int argc, const char* argv[]) {
setloglevel("TRACE");
EventBase base;
Signal::signal(SIGINT, [&]{ base.exit(); });
TcpServerPtr svr = TcpServer::startServer(&base, "", 99);
exitif(svr == NULL, "start tcp server failed");
svr->onConnState([](const TcpConnPtr& con) {
if (con->getState() == TcpConn::Connected) {
con->addIdleCB(2, [](const TcpConnPtr& con){
info("idle for 2 seconds, close connection");
con->close();
});
}
});
auto con = TcpConn::createConnection(&base, "localhost", 99);
base.runAfter(3000, [&](){base.exit();});
base.loop();
}
15 changes: 0 additions & 15 deletions examples/log.cc

This file was deleted.

20 changes: 20 additions & 0 deletions examples/reconnect.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <handy/handy.h>
using namespace handy;

int main(int argc, const char* argv[]) {
setloglevel("TRACE");
EventBase base;
Signal::signal(SIGINT, [&]{ base.exit(); });
TcpServerPtr svr = TcpServer::startServer(&base, "", 99);
exitif(svr == NULL, "start tcp server failed");
svr->onConnState([&](const TcpConnPtr& con) { //200ms后关闭连接
if (con->getState() == TcpConn::Connected)
base.runAfter(200, [con](){ info("close con after 200ms"); con->close(); });
});
TcpConnPtr con1 = TcpConn::createConnection(&base, "localhost", 99);
con1->setReconnectInterval(300);
// TcpConnPtr con2 = TcpConn::createConnection(&base, "localhost", 1, 100);
// con2->setReconnectInterval(200);
base.runAfter(600, [&](){base.exit();});
base.loop();
}
18 changes: 18 additions & 0 deletions examples/safe-close.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#include <handy/handy.h>
using namespace handy;

int main(int argc, const char* argv[]) {
EventBase base;
Signal::signal(SIGINT, [&]{ base.exit(); });
TcpServerPtr svr = TcpServer::startServer(&base, "", 99);
exitif(svr == NULL, "start tcp server failed");
TcpConnPtr con = TcpConn::createConnection(&base, "localhost", 99);
std::thread th([con,&base](){
sleep(1);
info("thread want to close an connection");
base.safeCall([con](){ con->close(); }); //其他线程需要操作连接,应当通过safeCall把操作交给io线程来做
});
base.runAfter(1500, [&base](){base.exit();});
base.loop();
th.join();
}
25 changes: 25 additions & 0 deletions examples/timer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include <handy/handy.h>
using namespace handy;

int main(int argc, const char* argv[]) {
EventBase base;
Signal::signal(SIGINT, [&]{ base.exit(); });
info("program begin");
base.runAfter(200, [](){
info("a task in runAfter 200ms");
});
base.runAfter(100, [](){
info("a task in runAfter 100ms interval 1000ms");
}, 1000);
TimerId id = base.runAt(time(NULL)*1000+300, [](){
info("a task in runAt now+300 interval 500ms");
}, 500);
base.runAfter(2000, [&](){
info("cancel task of interval 500ms");
base.cancel(id);
});
base.runAfter(3000, [&](){
base.exit();
});
base.loop();
}
2 changes: 1 addition & 1 deletion examples/write-on-empty.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ int main(int argc, const char* argv[]) {
});
return con;
});
thread th([]{
thread th([]{ //模拟了一个客户端,连接服务器后,接收服务器发送过来的数据
EventBase base2;
TcpConnPtr con = TcpConn::createConnection(&base2, "127.0.0.1", 99);
con->onRead([](const TcpConnPtr& con){
Expand Down
5 changes: 5 additions & 0 deletions handy/conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void TcpConn::connect(EventBase* base, const string& host, short port, int timeo
destHost_ = host;
destPort_ = port;
connectTimeout_ = timeout;
connectedTime_ = util::timeMilli();
localIp_ = localip;
Ip4Addr addr(host, port);
int fd = socket(AF_INET, SOCK_STREAM, 0);
Expand Down Expand Up @@ -153,6 +154,7 @@ int TcpConn::handleHandshake(const TcpConnPtr& con) {
channel_->enableReadWrite(true, false);
state_ = State::Connected;
if (state_ == State::Connected) {
connectedTime_ = util::timeMilli();
trace("tcp connected %s - %s fd %d",
local_.toString().c_str(), peer_.toString().c_str(), channel_->fd());
if (statecb_) {
Expand Down Expand Up @@ -332,6 +334,9 @@ void TcpServer::handleAccept() {
auto addcon = [=] {
TcpConnPtr con = createcb_();
con->attach(b, cfd, local, peer);
if (statecb_) {
con->onState(statecb_);
}
if (readcb_) {
con->onRead(readcb_);
}
Expand Down
4 changes: 3 additions & 1 deletion handy/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ namespace handy {
AutoContext ctx_, internalCtx_;
std::string destHost_, localIp_;
int destPort_, connectTimeout_, reconnectInterval_;
int64_t connectedTime_;
std::unique_ptr<CodecBase> codec_;
void handleRead(const TcpConnPtr& con);
void handleWrite(const TcpConnPtr& con);
Expand All @@ -100,6 +101,7 @@ namespace handy {
Ip4Addr getAddr() { return addr_; }
EventBase* getBase() { return base_; }
void onConnCreate(const std::function<TcpConnPtr()>& cb) { createcb_ = cb; }
void onConnState(const TcpCallBack& cb) { statecb_ = cb; }
void onConnRead(const TcpCallBack& cb) { readcb_ = cb; assert(!msgcb_); }
// 消息处理与Read回调冲突,只能调用一个
void onConnMsg(CodecBase* codec, const MsgCallBack& cb) { codec_.reset(codec); msgcb_ = cb; assert(!readcb_); }
Expand All @@ -108,7 +110,7 @@ namespace handy {
EventBases* bases_;
Ip4Addr addr_;
Channel* listen_channel_;
TcpCallBack readcb_;
TcpCallBack statecb_, readcb_;
MsgCallBack msgcb_;
std::function<TcpConnPtr()> createcb_;
std::unique_ptr<CodecBase> codec_;
Expand Down
8 changes: 6 additions & 2 deletions handy/event_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ void handyUpdateIdle(EventBase* base, const IdleId& idle) {
}

TcpConn::TcpConn()
:base_(NULL), channel_(NULL), state_(State::Invalid), destPort_(-1), connectTimeout_(0), reconnectInterval_(-1)
:base_(NULL), channel_(NULL), state_(State::Invalid), destPort_(-1),
connectTimeout_(0), reconnectInterval_(-1),connectedTime_(util::timeMilli())
{
}

Expand All @@ -354,7 +355,10 @@ void TcpConn::addIdleCB(int idle, const TcpCallBack& cb) {
void TcpConn::reconnect() {
auto con = shared_from_this();
getBase()->imp_->reconnectConns_.insert(con);
getBase()->runAfter(reconnectInterval_, [this, con]() {
int64_t interval = reconnectInterval_-(util::timeMilli()-connectedTime_);
interval = interval>0?interval:0;
info("reconnect interval: %d will reconnect after %lld ms", reconnectInterval_, interval);
getBase()->runAfter(interval, [this, con]() {
getBase()->imp_->reconnectConns_.erase(con);
connect(getBase(), destHost_, (short)destPort_, connectTimeout_, localIp_);
});
Expand Down

0 comments on commit e4594b1

Please sign in to comment.