Skip to content

Commit

Permalink
add retry()
Browse files Browse the repository at this point in the history
  • Loading branch information
kanade2010 committed Nov 11, 2018
1 parent 3384d93 commit 7903159
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 10 deletions.
2 changes: 2 additions & 0 deletions AsyncLogging/Logger.hh
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,6 @@ private:

};

const char* strerror_tl(int savedErrno);

#endif
61 changes: 56 additions & 5 deletions Connector/Connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

#include "SocketHelp.hh"
#include "Connector.hh"
#include "EventLoop.hh"
#include "Logger.hh"

Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
:p_loop(loop),
m_serverAddr(serverAddr),
m_state(kDisconnected),
m_retryDelayMs(kInitRetryDelayMs)
{

Expand All @@ -29,12 +31,14 @@ void Connector::connect()
int ret = sockets::connect(sockfd, m_serverAddr.getSockAddr());
int savedErrno = (ret == 0) ? 0 : errno;

if(ret != 0) LOG_TRACE << "connect error ("<< savedErrno << ") : " << strerror_tl(savedErrno);

switch(savedErrno)
{
case 0:
case EINPROGRESS: //Operation now in progress
case EINTR: //Interrupted system call
case EISCONN: //Transport endpoint is already connected
case EINPROGRESS: //Operation now in progress
case EINTR: //Interrupted system call
case EISCONN: //Transport endpoint is already connected
connecting(sockfd);
break;

Expand Down Expand Up @@ -69,12 +73,13 @@ void Connector::connect()

void Connector::connecting(int sockfd)
{
LOG_TRACE << "Connector::connecting] sockfd : " << sockfd;
assert(!p_channel);
p_channel.reset(new Channel(p_loop, sockfd));
p_channel->setWriteCallBack(std::bind(&Connector::handleWrite, this));
//p_channel->setErrorCallback()

//enableWriting if Channel Writeable ,Connect Success.
//enableWriting if Channel Writeable ,Connect Success.
p_channel->enableWriting();
}

Expand All @@ -87,10 +92,56 @@ void Connector::retry(int sockfd)

}

int Connector::removeAndResetChannel()
{
p_channel->disableAll();
p_channel->remove();

int sockfd = p_channel->fd();

//p_loop->queueInLoop(std::bind(&Connector::resetChannel, this));

return sockfd;
}

void Connector::resetChannel()
{
LOG_TRACE << "Connector::resetChannel()";
p_channel.reset();
}

void Connector::handleWrite()
{
LOG_TRACE << "Connector::handleWrite ";
m_newConnectionCallBack(p_channel->fd());

if(m_state == kDisconnected)
{
int sockfd = removeAndResetChannel();
int err = sockets::getSocketError(sockfd);

if(err)
{
LOG_WARN << "Connector::handleWrite - SO_ERROR = "
<< err << " " << strerror_tl(err);
retry(sockfd);
}
/*else if (sockets::isSelfConnect(sockfd))
{
}*/
else
{
setState(kConnected);
m_newConnectionCallBack(sockfd);
}

}
else
{
//怎么回事 , 小老弟?
assert(m_state == kDisconnected);
}

}


Expand Down
12 changes: 10 additions & 2 deletions Connector/Connector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class EventLoop;


class Connector
class Connector : public std::enable_shared_from_this<Connector>
{
public:
typedef std::function<void (int sockfd)> NewConnectionCallback;
Expand All @@ -29,18 +29,26 @@ public:
private:
const Connector operator=(const Connector&);
Connector(const Connector&);

enum States { kDisconnected, kConnecting, kConnected };
static const int kMaxRetryDelayMs = 30*1000;
static const int kInitRetryDelayMs = 500;

void connect();
void connecting(int sockfd);

void retry(int sockfd);
int removeAndResetChannel();
void resetChannel();

void setState(States s) { m_state = s; }

EventLoop* p_loop;
int m_retryDelayMs;
InetAddress m_serverAddr;

States m_state;

std::unique_ptr<Channel> p_channel;
NewConnectionCallback m_newConnectionCallBack;

Expand Down
20 changes: 18 additions & 2 deletions Connector/SocketHelp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
#include <assert.h>
#include <sys/uio.h> // readv

int sockets::createSocket(sa_family_t family){
int sockets::createSocket(sa_family_t family){
// Call "socket()" to create a (family) socket of the specified type.
// But also set it to have the 'close on exec' property (if we can)

int sock;

//CLOEXEC,即当调用exec()函数成功后,文件描述符会自动关闭。
Expand Down Expand Up @@ -186,6 +186,22 @@ void sockets::toIp(char* buf, size_t size,
}
}

int sockets::getSocketError(int sockfd)
{
int optval;

socklen_t optlen = static_cast<socklen_t>(sizeof optval);

if(::getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
{
return errno;
}
else
{
return optval;
}
}

/*
const struct sockaddr* sockets::sockaddr_cast(const struct sockaddr_in* addr)
{
Expand Down
2 changes: 1 addition & 1 deletion Connector/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ int main()
EventLoop loop;
g_loop = &loop;

InetAddress serverAddr("127.0.0.1", 8888);
InetAddress serverAddr("127.0.0.1", 123456);
Connector client(&loop, serverAddr);
client.setNewConnectionCallback(newConnetion);
client.start();
Expand Down
2 changes: 2 additions & 0 deletions SyncLogging/Logger.hh
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,6 @@ private:

};

const char* strerror_tl(int savedErrno);

#endif

0 comments on commit 7903159

Please sign in to comment.