Skip to content

Commit

Permalink
🐛 fix write bug
Browse files Browse the repository at this point in the history
  • Loading branch information
markparticle committed Jun 29, 2020
1 parent 3faa908 commit c3dba7a
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 77 deletions.
58 changes: 27 additions & 31 deletions code/http/httpconn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ HttpConn::~HttpConn() {
void HttpConn::init(int fd, const sockaddr_in& addr) {
assert(fd > 0);
userCount++;
isClose_ = false;
addr_ = addr;
fd_ = fd;
reset();
LOG_INFO("Client[%d](%s:%d) in, userCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
}

void HttpConn::reset() {
writeBuff_.RetrieveAll();
readBuff_.RetrieveAll();
LOG_INFO("Client[%d](%s:%d) in, userCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
isClose_ = false;
request_.Init();
}

void HttpConn::Close() {
response_.UnmapFile();
if(isClose_ == false){
isClose_ = true;
userCount--;
Expand All @@ -56,8 +62,8 @@ int HttpConn::GetPort() const {
return addr_.sin_port;
}

ssize_t HttpConn::read(int* saveErrno) {
ssize_t len = -1;
size_t HttpConn::read(int* saveErrno) {
size_t len = -1;
do {
len = readBuff_.ReadFd(fd_, saveErrno);
if (len <= 0) {
Expand All @@ -67,59 +73,49 @@ ssize_t HttpConn::read(int* saveErrno) {
return len;
}

ssize_t HttpConn::write(int* saveErrno) {
size_t len = -1;
size_t HttpConn::write(int* saveErrno) {
int len = -1;
do {
len = writev(fd_, iov_, iovCnt_);
LOG_DEBUG("write len: %d", len);
if(len <= 0) {
if(errno == EAGAIN) {
*saveErrno = errno;
return len;
}
response_.UnmapFile();
return len;
*saveErrno = errno;
break;
}

if(len > iov_[0].iov_len && iovCnt_ == 2) {
iov_[1].iov_base = (uint8_t*)iov_[1].iov_base + (len - iov_[0].iov_len);
if(iov_[0].iov_len + iov_[1].iov_len == 0) { break; } /* 传输结束 */
else if(len > (int)iov_[0].iov_len) {
iov_[1].iov_base = (uint8_t*) iov_[1].iov_base + (len - iov_[0].iov_len);
iov_[1].iov_len -= (len - iov_[0].iov_len);

writeBuff_.RetrieveAll();
iov_[0].iov_len = 0;
if(iov_[0].iov_len) {
writeBuff_.RetrieveAll();
iov_[0].iov_len = 0;
}
}
else {
iov_[0].iov_base = (uint8_t*)iov_[0].iov_base + len;
iov_[0].iov_len -= len;
writeBuff_.Retrieve(len);
}
/* 发送完毕 */
if(iov_[0].iov_len + iov_[1].iov_len == 0) {
response_.UnmapFile();
break;
}
} while(isET || ToWriteBytes() > 10240); /* LT模式的大文件传输用while */
} while(isET || ToWriteBytes() > 10240);
return len;
}

void HttpConn::process() {
request_.Init();
if(request_.parse(readBuff_)) {
response_.Init(srcDir, request_.path(), request_.IsKeepAlive());
response_.Init(srcDir, request_.path(), request_.IsKeepAlive(), 200);
} else {
response_.Init(srcDir, request_.path(), false, 400);
}
LOG_DEBUG("Parse code: %d, %s", response_.Code(), request_.path().c_str());
response_.MakeResponse(writeBuff_);

/* 响应头 */
iov_[0].iov_base = const_cast<char*>(writeBuff_.Peek());
iov_[0].iov_len = writeBuff_.ReadableBytes();
iovCnt_ = 1;


/* 响应文件 */
if(response_.FileLen() > 0 && response_.File()) {
iov_[1].iov_base = response_.File();
iov_[1].iov_len = response_.FileLen();
iovCnt_ = 2;
}
LOG_DEBUG("filesize:%d, %d", response_.FileLen() , iovCnt_);
LOG_DEBUG("filesize:%d, %d to %d", response_.FileLen() , iovCnt_, ToWriteBytes());
}
9 changes: 5 additions & 4 deletions code/http/httpconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@
#include "httprequest.h"
#include "httpresponse.h"


class HttpConn {
public:
HttpConn();
~HttpConn();

void init(int sockFd, const sockaddr_in& addr);
ssize_t read(int* saveErrno);
ssize_t write(int* saveErrno);
void reset();
size_t read(int* saveErrno);
size_t write(int* saveErrno);

void Close();

int GetFd() const;
int GetPort() const;
const char* GetIP() const;
sockaddr_in GetAddr() const;

void process();
int ToWriteBytes() {
return iov_[0].iov_len + iov_[1].iov_len;
Expand All @@ -49,6 +49,7 @@ class HttpConn {
static std::atomic<int> userCount;

private:

int fd_;
struct sockaddr_in addr_;

Expand Down
1 change: 1 addition & 0 deletions code/log/log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ void Log::write(int level, const char *format, ...) {
{
unique_lock<mutex> locker(mtx_);
locker.unlock();

char newFile[LOG_NAME_LEN];
char tail[36] = {0};
snprintf(tail, 36, "%04d_%02d_%02d", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday);
Expand Down
4 changes: 2 additions & 2 deletions code/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ int main() {
/* 守护进程 后台运行 */
//daemon(1, 0);
WebServer server(
1315, 3, true, false, /* 端口 ET模式 Proactor/Reactor(使用异步线程池) 优雅退出 */
1315, 3, true, false, 5000, /* 端口 ET模式 Proactor/Reactor(使用异步线程池) 优雅退出 timeoutMs */
3306, "root", "root", "webserver", /* Mysql配置 */
1, 6, true, 2, 5000); /* 连接池数量 线程池数量 日志开关 日志等级 日志异步队列容量 */
1000, 8, true, 2, 0); /* 连接池数量 线程池数量 日志开关 日志等级 日志异步队列容量 */
server.Start();
}
3 changes: 0 additions & 3 deletions code/server/epoller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Epoller::~Epoller() {
}

bool Epoller::AddFd(int fd, uint32_t events) {
assert(fd >= 0);
if(fd < 0) return false;
epoll_event ev = {0};
ev.data.fd = fd;
Expand All @@ -24,7 +23,6 @@ bool Epoller::AddFd(int fd, uint32_t events) {
}

bool Epoller::ModFd(int fd, uint32_t events) {
assert(fd >= 0);
if(fd < 0) return false;
epoll_event ev = {0};
ev.data.fd = fd;
Expand All @@ -33,7 +31,6 @@ bool Epoller::ModFd(int fd, uint32_t events) {
}

bool Epoller::DelFd(int fd) {
assert(fd >= 0);
if(fd < 0) return false;
epoll_event ev = {0};
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, &ev);
Expand Down
2 changes: 1 addition & 1 deletion code/server/epoller.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

class Epoller {
public:
Epoller(int maxEvent = 1024);
explicit Epoller(int maxEvent = 1024);

~Epoller();

Expand Down
75 changes: 42 additions & 33 deletions code/server/webserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
using namespace std;

WebServer::WebServer(
int port, int trigMode, bool isReactor, bool OptLinger,
int port, int trigMode, bool isReactor, bool OptLinger, int timeoutMS,
int sqlPort, const char* sqlUser, const char* sqlPwd,
const char* dbName, int connPoolNum, int threadNum,
bool openLog, int logLevel, int logQueSize):
port_(port), openLinger_(OptLinger), isReactor_(isReactor), isClose_(false),
timer_(new HeapTimer()), threadpool_(new ThreadPool(threadNum)), epoller_(new Epoller()) {
port_(port), openLinger_(OptLinger), isReactor_(isReactor),
isClose_(false), timeoutMS_(timeoutMS), timer_(new HeapTimer()),
threadpool_(new ThreadPool(threadNum)), epoller_(new Epoller()) {

srcDir_ = getcwd(nullptr, 256);
assert(srcDir_);
Expand All @@ -24,18 +25,24 @@ WebServer::WebServer(
SqlConnPool::Instance()->Init("localhost", sqlPort, sqlUser, sqlPwd, dbName, connPoolNum);

InitEventMode_(trigMode);
if(!InitSocket_()){isClose_ = true;}
if(!InitSocket_()) { isClose_ = true;}

if(openLog) {
Log::Instance()->init(logLevel, "./log", ".log", logQueSize);
if(isClose_) { LOG_ERROR("========== Server init error!=========="); }
else {
LOG_INFO("========== Server init ==========");
LOG_INFO("LogSys level: %d", logLevel);
LOG_INFO("Listen Mode: %s, OpenConn Mode: %s",
(listenEvent_ & EPOLLET ? "ET": "LT"),
(connEvent_ & EPOLLET ? "ET": "LT"));

LOG_INFO("Port:%d, OpenLinger: %s, IO Mode: %s",
port_, OptLinger? "true":"false", isReactor_?"Reactor":"Proctor");
port_, OptLinger? "true":"false",
isReactor_?"Reactor":"Proctor");


LOG_INFO("Listen Mode: %s, OpenConn Mode: %s",
(listenEvent_ & EPOLLET ? "ET": "LT"),
(connEvent_ & EPOLLET ? "ET": "LT"));

LOG_INFO("LogSys level: %d", logLevel);
LOG_INFO("srcDir: %s", HttpConn::srcDir);
LOG_INFO("SqlConnPool num: %d, ThreadPool num: %d", connPoolNum, threadNum);
}
Expand Down Expand Up @@ -74,9 +81,10 @@ void WebServer::InitEventMode_(int trigMode) {
}

void WebServer::Start() {
int timeMS = 0;
if(!isClose_) { LOG_INFO("========== Server start =========="); }
while(!isClose_) {
int timeMS = timer_->GetNextTick();
if(timeoutMS_ > 0) { timeMS = timer_->GetNextTick(); }
int eventCnt = epoller_->Wait(timeMS);
for(int i = 0; i < eventCnt; i++) {
/* 处理事件 */
Expand All @@ -86,6 +94,7 @@ void WebServer::Start() {
DealListen_();
}
else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
LOG_DEBUG("!!!!!!!EVENT QUIT %d %d %d", events & EPOLLRDHUP, events & EPOLLHUP, EPOLLERR);
assert(users_.count(fd) > 0);
CloseConn_(&users_[fd]);
}
Expand Down Expand Up @@ -114,15 +123,17 @@ void WebServer::SendError_(int fd, const char*info) {

void WebServer::CloseConn_(HttpConn* client) {
assert(client);
LOG_INFO("Client[%d] quit!", client->GetFd());
epoller_->DelFd(client->GetFd());
client->Close();
LOG_INFO("Client[%d] quit!", client->GetFd());
}

void WebServer::AddClient_(int fd, sockaddr_in addr) {
assert(fd > 0);
users_[fd].init(fd, addr);
timer_->add(fd, 3 * TIME_SLOT, std::bind(&WebServer::CloseConn_, this, &users_[fd]));
if(timeoutMS_ > 0) {
timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd]));
}
epoller_->AddFd(fd, EPOLLIN | connEvent_);
SetFdNonblock(fd);
LOG_INFO("Client[%d] in!", users_[fd].GetFd());
Expand All @@ -133,14 +144,7 @@ void WebServer::DealListen_() {
socklen_t len = sizeof(addr);
do {
int fd = accept(listenFd_, (struct sockaddr *)&addr, &len);
if(fd <= 0) {
if(fd == -1 && errno == EAGAIN){
/* 完成所有accept任务 */
return;
}
LOG_ERROR("Failed accept Client(%s:%d).", inet_ntoa(addr.sin_addr), addr.sin_port);
return;
}
if(fd <= 0) { break;}
else if(HttpConn::userCount >= MAX_FD) {
SendError_(fd, "Server busy!");
LOG_WARN("Clients is full!");
Expand Down Expand Up @@ -172,7 +176,9 @@ void WebServer::DealWrite_(HttpConn* client) {

void WebServer::ExtentTime_(HttpConn* client) {
assert(client);
timer_->adjust(client->GetFd(), 3 * TIME_SLOT);
if(timeoutMS_ > 0) {
timer_->adjust(client->GetFd(), timeoutMS_);
}
}

void WebServer::OnRead_(HttpConn* client) {
Expand All @@ -181,37 +187,40 @@ void WebServer::OnRead_(HttpConn* client) {
int readErrno = 0;
ret = client->read(&readErrno);

if(ret <= 0 && readErrno != EAGAIN && readErrno != EWOULDBLOCK) {
if(ret <= 0 && !(readErrno & EAGAIN) && !(readErrno & EWOULDBLOCK)) {
CloseConn_(client);
return;
}
client->process();
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
}

void WebServer::OnWrite_(HttpConn* client) {
assert(client);
int ret = -1;
int writeErrno = 0;

ret = client->write(&writeErrno);
LOG_DEBUG("To Write:%d", client->ToWriteBytes());
if(client->ToWriteBytes() == 0){
LOG_DEBUG("Write Finish!!");
if(client->ToWriteBytes() == 0) {
/* 传输完成 */
if(client->IsKeepAlive()) {
LOG_DEBUG("keepAlive!");
client->reset();
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN);
LOG_DEBUG("KEEPLIVE!!");
return;
}
}
if(ret < 0) {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
return;
else if(ret < 0) {
if(writeErrno & EAGAIN) {
/* 继续传输 */
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
return;
}
}
CloseConn_(client);
}

bool WebServer::InitSocket_() {
/* Create listenFd */
bool WebServer::InitSocket_() {
int ret;
struct sockaddr_in addr;
if(port_ > 65535 && port_ < 1024) {
Expand All @@ -223,7 +232,7 @@ bool WebServer::InitSocket_() {
addr.sin_port = htons(port_);
struct linger optLinger = { 0 };
if(openLinger_) {
/* 优雅关闭: 直到所剩数据发送完毕或超时*/
/* 优雅关闭: 直到所剩数据发送完毕或超时 */
optLinger.l_onoff = 1;
optLinger.l_linger = 1;
}
Expand Down
Loading

0 comments on commit c3dba7a

Please sign in to comment.