diff --git a/reactor/Makefile b/reactor/Makefile index ee9a1b2..9129f80 100644 --- a/reactor/Makefile +++ b/reactor/Makefile @@ -1,5 +1,5 @@ -SUBDIRS = s00 s01 s02 s03 s04 +SUBDIRS = s00 s01 s02 s03 s04 s05 all: $(SUBDIRS) diff --git a/reactor/README b/reactor/README index bd94eef..305d1ac 100644 --- a/reactor/README +++ b/reactor/README @@ -22,15 +22,16 @@ Step 3: wakeup(), runInLoop() test6: runInLoop(), runAfter() in a different thread Step 4: Acceptor - test7: listen on port 9981, accept, send, and close. + test7: listen on port 9981, accept, send, and close Step 5: TcpServer - -Step 6: TcpConnection up and down + test8: use of TcpServer Step 7: TcpConnection read -Step 8: TcpConnection write +Step 6: TcpConnection up and down + +Step 8: TcpConnection write, write complete callback Step 9: multithreaded TcpServer diff --git a/reactor/s05/Callbacks.h b/reactor/s05/Callbacks.h index 3bb319e..1885525 100644 --- a/reactor/s05/Callbacks.h +++ b/reactor/s05/Callbacks.h @@ -18,7 +18,16 @@ namespace muduo // All client visible callbacks go here. +class TcpConnection; +typedef boost::shared_ptr TcpConnectionPtr; + typedef boost::function TimerCallback; +typedef boost::function ConnectionCallback; +typedef boost::function MessageCallback; +typedef boost::function CloseCallback; + } diff --git a/reactor/s05/Makefile b/reactor/s05/Makefile index 3cd0304..d656387 100644 --- a/reactor/s05/Makefile +++ b/reactor/s05/Makefile @@ -7,10 +7,12 @@ LIB_SRC = \ Poller.cc \ Socket.cc \ SocketsOps.cc \ + TcpConnection.cc \ + TcpServer.cc \ Timer.cc \ TimerQueue.cc -BINARIES = test1 test2 test3 test4 test5 test6 test7 +BINARIES = test1 test2 test3 test4 test5 test6 test7 test8 all: $(BINARIES) @@ -23,3 +25,4 @@ test4: test4.cc test5: test5.cc test6: test6.cc test7: test7.cc +test8: test8.cc diff --git a/reactor/s05/SocketsOps.cc b/reactor/s05/SocketsOps.cc index f75afc6..dfcb18e 100644 --- a/reactor/s05/SocketsOps.cc +++ b/reactor/s05/SocketsOps.cc @@ -161,3 +161,15 @@ void sockets::fromHostPort(const char* ip, uint16_t port, } } +struct sockaddr_in sockets::getLocalAddr(int sockfd) +{ + struct sockaddr_in localaddr; + bzero(&localaddr, sizeof localaddr); + socklen_t addrlen = sizeof(localaddr); + if (::getsockname(sockfd, sockaddr_cast(&localaddr), &addrlen) < 0) + { + LOG_SYSERR << "sockets::getLocalAddr"; + } + return localaddr; +} + diff --git a/reactor/s05/SocketsOps.h b/reactor/s05/SocketsOps.h index 1ff538c..623e205 100644 --- a/reactor/s05/SocketsOps.h +++ b/reactor/s05/SocketsOps.h @@ -67,6 +67,9 @@ void toHostPort(char* buf, size_t size, const struct sockaddr_in& addr); void fromHostPort(const char* ip, uint16_t port, struct sockaddr_in* addr); + +struct sockaddr_in getLocalAddr(int sockfd); + } } diff --git a/reactor/s05/TcpConnection.cc b/reactor/s05/TcpConnection.cc new file mode 100644 index 0000000..7fa99c4 --- /dev/null +++ b/reactor/s05/TcpConnection.cc @@ -0,0 +1,64 @@ +// excerpts from http://code.google.com/p/muduo/ +// +// Use of this source code is governed by a BSD-style license +// that can be found in the License file. +// +// Author: Shuo Chen (chenshuo at chenshuo dot com) + +#include "TcpConnection.h" + +#include "logging/Logging.h" +#include "Channel.h" +#include "EventLoop.h" +#include "Socket.h" + +#include + +#include +#include + +using namespace muduo; + +TcpConnection::TcpConnection(EventLoop* loop, + const std::string& nameArg, + int sockfd, + const InetAddress& localAddr, + const InetAddress& peerAddr) + : loop_(CHECK_NOTNULL(loop)), + name_(nameArg), + state_(kConnecting), + socket_(new Socket(sockfd)), + channel_(new Channel(loop, sockfd)), + localAddr_(localAddr), + peerAddr_(peerAddr) +{ + LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this + << " fd=" << sockfd; + channel_->setReadCallback( + boost::bind(&TcpConnection::handleRead, this)); +} + +TcpConnection::~TcpConnection() +{ + LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this + << " fd=" << channel_->fd(); +} + +void TcpConnection::connectEstablished() +{ + loop_->assertInLoopThread(); + assert(state_ == kConnecting); + setState(kConnected); + channel_->enableReading(); + + connectionCallback_(shared_from_this()); +} + +void TcpConnection::handleRead() +{ + char buf[65536]; + ssize_t n = read(channel_->fd(), buf, sizeof buf); + messageCallback_(shared_from_this(), buf, n); + // FIXME: close connection if n == 0 +} + diff --git a/reactor/s05/TcpConnection.h b/reactor/s05/TcpConnection.h new file mode 100644 index 0000000..08bd475 --- /dev/null +++ b/reactor/s05/TcpConnection.h @@ -0,0 +1,86 @@ +// excerpts from http://code.google.com/p/muduo/ +// +// Use of this source code is governed by a BSD-style license +// that can be found in the License file. +// +// Author: Shuo Chen (chenshuo at chenshuo dot com) + +#ifndef MUDUO_NET_TCPCONNECTION_H +#define MUDUO_NET_TCPCONNECTION_H + +#include "Callbacks.h" +#include "InetAddress.h" + +#include +#include +#include +#include +#include + +namespace muduo +{ + +class Channel; +class EventLoop; +class Socket; + +/// +/// TCP connection, for both client and server usage. +/// +class TcpConnection : boost::noncopyable, + public boost::enable_shared_from_this +{ + public: + /// Constructs a TcpConnection with a connected sockfd + /// + /// User should not create this object. + TcpConnection(EventLoop* loop, + const std::string& name, + int sockfd, + const InetAddress& localAddr, + const InetAddress& peerAddr); + ~TcpConnection(); + + EventLoop* getLoop() const { return loop_; } + const std::string& name() const { return name_; } + const InetAddress& localAddress() { return localAddr_; } + const InetAddress& peerAddress() { return peerAddr_; } + bool connected() const { return state_ == kConnected; } + + void setConnectionCallback(const ConnectionCallback& cb) + { connectionCallback_ = cb; } + + void setMessageCallback(const MessageCallback& cb) + { messageCallback_ = cb; } + + /// Internal use only. + void setCloseCallback(const CloseCallback& cb) + { closeCallback_ = cb; } + + // called when TcpServer accepts a new connection + void connectEstablished(); // should be called only once + + private: + enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; + + void setState(StateE s) { state_ = s; } + void handleRead(); + + EventLoop* loop_; + std::string name_; + StateE state_; // FIXME: use atomic variable + // we don't expose those classes to client. + boost::scoped_ptr socket_; + boost::scoped_ptr channel_; + InetAddress localAddr_; + InetAddress peerAddr_; + ConnectionCallback connectionCallback_; + MessageCallback messageCallback_; + CloseCallback closeCallback_; +}; + +typedef boost::shared_ptr TcpConnectionPtr; + +} + +#endif // MUDUO_NET_TCPCONNECTION_H diff --git a/reactor/s05/TcpServer.cc b/reactor/s05/TcpServer.cc new file mode 100644 index 0000000..21c26fb --- /dev/null +++ b/reactor/s05/TcpServer.cc @@ -0,0 +1,77 @@ +// excerpts from http://code.google.com/p/muduo/ +// +// Use of this source code is governed by a BSD-style license +// that can be found in the License file. +// +// Author: Shuo Chen (chenshuo at chenshuo dot com) + +#include "TcpServer.h" + +#include "logging/Logging.h" +#include "Acceptor.h" +#include "EventLoop.h" +#include "SocketsOps.h" + +#include + +#include // snprintf + +using namespace muduo; + +TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr) + : loop_(CHECK_NOTNULL(loop)), + name_(listenAddr.toHostPort()), + acceptor_(new Acceptor(loop, listenAddr)), + started_(false), + nextConnId_(1) +{ + acceptor_->setNewConnectionCallback( + boost::bind(&TcpServer::newConnection, this, _1, _2)); +} + +TcpServer::~TcpServer() +{ +} + +void TcpServer::start() +{ + if (!started_) + { + started_ = true; + } + + if (!acceptor_->listenning()) + { + loop_->runInLoop( + boost::bind(&Acceptor::listen, get_pointer(acceptor_))); + } +} + +void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) +{ + loop_->assertInLoopThread(); + char buf[32]; + snprintf(buf, sizeof buf, "#%d", nextConnId_); + ++nextConnId_; + std::string connName = name_ + buf; + + LOG_INFO << "TcpServer::newConnection [" << name_ + << "] - new connection [" << connName + << "] from " << peerAddr.toHostPort(); + InetAddress localAddr(sockets::getLocalAddr(sockfd)); + // FIXME poll with zero timeout to double confirm the new connection + TcpConnectionPtr conn( + new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr)); + connections_[connName] = conn; + conn->setConnectionCallback(connectionCallback_); + conn->setMessageCallback(messageCallback_); + conn->setCloseCallback( + boost::bind(&TcpServer::removeConnection, this, _1)); + conn->connectEstablished(); +} + +void TcpServer::removeConnection(const TcpConnectionPtr& conn) +{ + // TODO +} + diff --git a/reactor/s05/TcpServer.h b/reactor/s05/TcpServer.h new file mode 100644 index 0000000..4291d4c --- /dev/null +++ b/reactor/s05/TcpServer.h @@ -0,0 +1,66 @@ +// excerpts from http://code.google.com/p/muduo/ +// +// Use of this source code is governed by a BSD-style license +// that can be found in the License file. +// +// Author: Shuo Chen (chenshuo at chenshuo dot com) + +#ifndef MUDUO_NET_TCPSERVER_H +#define MUDUO_NET_TCPSERVER_H + +#include "Callbacks.h" +#include "TcpConnection.h" + +#include +#include +#include + +namespace muduo +{ + +class Acceptor; +class EventLoop; + +class TcpServer : boost::noncopyable +{ + public: + + TcpServer(EventLoop* loop, const InetAddress& listenAddr); + ~TcpServer(); // force out-line dtor, for scoped_ptr members. + + /// Starts the server if it's not listenning. + /// + /// It's harmless to call it multiple times. + /// Thread safe. + void start(); + + /// Set connection callback. + /// Not thread safe. + void setConnectionCallback(const ConnectionCallback& cb) + { connectionCallback_ = cb; } + + /// Set message callback. + /// Not thread safe. + void setMessageCallback(const MessageCallback& cb) + { messageCallback_ = cb; } + + private: + /// Not thread safe, but in loop + void newConnection(int sockfd, const InetAddress& peerAddr); + void removeConnection(const TcpConnectionPtr& conn); + + typedef std::map ConnectionMap; + + EventLoop* loop_; // the acceptor loop + const std::string name_; + boost::scoped_ptr acceptor_; // avoid revealing Acceptor + ConnectionCallback connectionCallback_; + MessageCallback messageCallback_; + bool started_; + int nextConnId_; // always in loop thread + ConnectionMap connections_; +}; + +} + +#endif // MUDUO_NET_TCPSERVER_H diff --git a/reactor/s05/test8.cc b/reactor/s05/test8.cc new file mode 100644 index 0000000..d83cff6 --- /dev/null +++ b/reactor/s05/test8.cc @@ -0,0 +1,42 @@ +#include "TcpServer.h" +#include "EventLoop.h" +#include "InetAddress.h" +#include + +void onConnection(const muduo::TcpConnectionPtr& conn) +{ + if (conn->connected()) + { + printf("onConnection(): new connection [%s] from %s\n", + conn->name().c_str(), + conn->peerAddress().toHostPort().c_str()); + } + else + { + printf("onConnection(): connection [%s] is down\n", + conn->name().c_str()); + } +} + +void onMessage(const muduo::TcpConnectionPtr& conn, + const char* data, + ssize_t len) +{ + printf("onMessage(): received %zd bytes from connection [%s]\n", + len, conn->name().c_str()); +} + +int main() +{ + printf("main(): pid = %d\n", getpid()); + + muduo::InetAddress listenAddr(9981); + muduo::EventLoop loop; + + muduo::TcpServer server(&loop, listenAddr); + server.setConnectionCallback(onConnection); + server.setMessageCallback(onMessage); + server.start(); + + loop.loop(); +}