Skip to content

Commit

Permalink
TcpServer creates TcpConnection.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenshuo committed May 12, 2011
1 parent a20e676 commit e54e538
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 6 deletions.
2 changes: 1 addition & 1 deletion reactor/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

SUBDIRS = s00 s01 s02 s03 s04
SUBDIRS = s00 s01 s02 s03 s04 s05

all: $(SUBDIRS)

Expand Down
9 changes: 5 additions & 4 deletions reactor/README
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ Step 3: wakeup(), runInLoop()
test6: runInLoop(), runAfter() in a different thread

Step 4: Acceptor
test7: listen on port 9981, accept, send, and close.
test7: listen on port 9981, accept, send, and close

Step 5: TcpServer

Step 6: TcpConnection up and down
test8: use of TcpServer

Step 7: TcpConnection read

Step 8: TcpConnection write
Step 6: TcpConnection up and down

Step 8: TcpConnection write, write complete callback

Step 9: multithreaded TcpServer

Expand Down
9 changes: 9 additions & 0 deletions reactor/s05/Callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@ namespace muduo

// All client visible callbacks go here.

class TcpConnection;
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;

typedef boost::function<void()> TimerCallback;
typedef boost::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef boost::function<void (const TcpConnectionPtr&,
const char* data,
ssize_t len)> MessageCallback;
typedef boost::function<void (const TcpConnectionPtr&)> CloseCallback;


}

Expand Down
5 changes: 4 additions & 1 deletion reactor/s05/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ LIB_SRC = \
Poller.cc \
Socket.cc \
SocketsOps.cc \
TcpConnection.cc \
TcpServer.cc \
Timer.cc \
TimerQueue.cc

BINARIES = test1 test2 test3 test4 test5 test6 test7
BINARIES = test1 test2 test3 test4 test5 test6 test7 test8

all: $(BINARIES)

Expand All @@ -23,3 +25,4 @@ test4: test4.cc
test5: test5.cc
test6: test6.cc
test7: test7.cc
test8: test8.cc
12 changes: 12 additions & 0 deletions reactor/s05/SocketsOps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,15 @@ void sockets::fromHostPort(const char* ip, uint16_t port,
}
}

struct sockaddr_in sockets::getLocalAddr(int sockfd)
{
struct sockaddr_in localaddr;
bzero(&localaddr, sizeof localaddr);
socklen_t addrlen = sizeof(localaddr);
if (::getsockname(sockfd, sockaddr_cast(&localaddr), &addrlen) < 0)
{
LOG_SYSERR << "sockets::getLocalAddr";
}
return localaddr;
}

3 changes: 3 additions & 0 deletions reactor/s05/SocketsOps.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ void toHostPort(char* buf, size_t size,
const struct sockaddr_in& addr);
void fromHostPort(const char* ip, uint16_t port,
struct sockaddr_in* addr);

struct sockaddr_in getLocalAddr(int sockfd);

}
}

Expand Down
64 changes: 64 additions & 0 deletions reactor/s05/TcpConnection.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "TcpConnection.h"

#include "logging/Logging.h"
#include "Channel.h"
#include "EventLoop.h"
#include "Socket.h"

#include <boost/bind.hpp>

#include <errno.h>
#include <stdio.h>

using namespace muduo;

TcpConnection::TcpConnection(EventLoop* loop,
const std::string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr)
{
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this));
}

TcpConnection::~TcpConnection()
{
LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this
<< " fd=" << channel_->fd();
}

void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->enableReading();

connectionCallback_(shared_from_this());
}

void TcpConnection::handleRead()
{
char buf[65536];
ssize_t n = read(channel_->fd(), buf, sizeof buf);
messageCallback_(shared_from_this(), buf, n);
// FIXME: close connection if n == 0
}

86 changes: 86 additions & 0 deletions reactor/s05/TcpConnection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H

#include "Callbacks.h"
#include "InetAddress.h"

#include <boost/any.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>

namespace muduo
{

class Channel;
class EventLoop;
class Socket;

///
/// TCP connection, for both client and server usage.
///
class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection>
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop,
const std::string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();

EventLoop* getLoop() const { return loop_; }
const std::string& name() const { return name_; }
const InetAddress& localAddress() { return localAddr_; }
const InetAddress& peerAddress() { return peerAddr_; }
bool connected() const { return state_ == kConnected; }

void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

/// Internal use only.
void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }

// called when TcpServer accepts a new connection
void connectEstablished(); // should be called only once

private:
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };

void setState(StateE s) { state_ = s; }
void handleRead();

EventLoop* loop_;
std::string name_;
StateE state_; // FIXME: use atomic variable
// we don't expose those classes to client.
boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;
InetAddress localAddr_;
InetAddress peerAddr_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
CloseCallback closeCallback_;
};

typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;

}

#endif // MUDUO_NET_TCPCONNECTION_H
77 changes: 77 additions & 0 deletions reactor/s05/TcpServer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "TcpServer.h"

#include "logging/Logging.h"
#include "Acceptor.h"
#include "EventLoop.h"
#include "SocketsOps.h"

#include <boost/bind.hpp>

#include <stdio.h> // snprintf

using namespace muduo;

TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(listenAddr.toHostPort()),
acceptor_(new Acceptor(loop, listenAddr)),
started_(false),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection, this, _1, _2));
}

TcpServer::~TcpServer()
{
}

void TcpServer::start()
{
if (!started_)
{
started_ = true;
}

if (!acceptor_->listenning())
{
loop_->runInLoop(
boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
char buf[32];
snprintf(buf, sizeof buf, "#%d", nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;

LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toHostPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
TcpConnectionPtr conn(
new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1));
conn->connectEstablished();
}

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// TODO
}

66 changes: 66 additions & 0 deletions reactor/s05/TcpServer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H

#include "Callbacks.h"
#include "TcpConnection.h"

#include <map>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>

namespace muduo
{

class Acceptor;
class EventLoop;

class TcpServer : boost::noncopyable
{
public:

TcpServer(EventLoop* loop, const InetAddress& listenAddr);
~TcpServer(); // force out-line dtor, for scoped_ptr members.

/// Starts the server if it's not listenning.
///
/// It's harmless to call it multiple times.
/// Thread safe.
void start();

/// Set connection callback.
/// Not thread safe.
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

/// Set message callback.
/// Not thread safe.
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

private:
/// Not thread safe, but in loop
void newConnection(int sockfd, const InetAddress& peerAddr);
void removeConnection(const TcpConnectionPtr& conn);

typedef std::map<std::string, TcpConnectionPtr> ConnectionMap;

EventLoop* loop_; // the acceptor loop
const std::string name_;
boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
bool started_;
int nextConnId_; // always in loop thread
ConnectionMap connections_;
};

}

#endif // MUDUO_NET_TCPSERVER_H
Loading

0 comments on commit e54e538

Please sign in to comment.