diff --git a/CMakeLists.txt b/CMakeLists.txt index 5a42c601f..fc1568d65 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,6 +56,9 @@ find_path(HIREDIS_INCLUDE_DIR hiredis/hiredis.h) find_library(HIREDIS_LIBRARY NAMES hiredis) find_path(GD_INCLUDE_DIR gd.h) find_library(GD_LIBRARY NAMES gd) +find_program(THRIFT_COMPILER thrift) +find_path(THRIFT_INCLUDE_DIR thrift) +find_library(THRIFT_LIBRARY NAMES thrift) if(CARES_INCLUDE_DIR AND CARES_LIBRARY) message(STATUS "found cares") @@ -78,6 +81,9 @@ endif() if(GD_INCLUDE_DIR AND GD_LIBRARY) message(STATUS "found gd") endif() +if(THRIFT_COMPILER AND THRIFT_INCLUDE_DIR AND THRIFT_LIBRARY) + message(STATUS "found thrift") +endif() include_directories(${Boost_INCLUDE_DIRS}) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 2a1d6ff41..de0c83e59 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -53,3 +53,9 @@ if(GD_INCLUDE_DIR AND GD_LIBRARY) else() add_subdirectory(procmon EXCLUDE_FROM_ALL) endif() + +if(THRIFT_COMPILER AND THRIFT_INCLUDE_DIR AND THRIFT_LIBRARY) + add_subdirectory(thrift) +else() + add_subdirectory(thrift EXCLUDE_FROM_ALL) +endif() diff --git a/examples/thrift/CMakeLists.txt b/examples/thrift/CMakeLists.txt new file mode 100644 index 000000000..6193d7590 --- /dev/null +++ b/examples/thrift/CMakeLists.txt @@ -0,0 +1,9 @@ +set(MUDUO_THRIFT_SRCS + ThriftConnection.cc + ThriftServer.cc + ) +add_library(muduo_thrift ${MUDUO_THRIFT_SRCS}) +target_link_libraries(muduo_thrift muduo_net thrift) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +add_subdirectory(tests) diff --git a/examples/thrift/ThriftConnection.cc b/examples/thrift/ThriftConnection.cc new file mode 100644 index 000000000..dbe7f5a4a --- /dev/null +++ b/examples/thrift/ThriftConnection.cc @@ -0,0 +1,121 @@ +#include "ThriftConnection.h" + +#include + +#include + +#include + +#include "ThriftServer.h" + +using namespace muduo; +using namespace muduo::net; + +ThriftConnection::ThriftConnection(ThriftServer* server, + const TcpConnectionPtr& conn) + : server_(server), + conn_(conn), + state_(kExpectFrameSize), + frameSize_(0) +{ + conn_->setMessageCallback(boost::bind(&ThriftConnection::onMessage, + this, _1, _2, _3)); + nullTransport_.reset(new TNullTransport()); + inputTransport_.reset(new TMemoryBuffer(NULL, 0)); + outputTransport_.reset(new TMemoryBuffer()); + + factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_); + factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_); + + inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_); + outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_); + + processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, nullTransport_); +} + +void ThriftConnection::onMessage(const TcpConnectionPtr& conn, + Buffer* buffer, + Timestamp receiveTime) +{ + bool more = true; + while (more) + { + if (state_ == kExpectFrameSize) + { + if (buffer->readableBytes() >= 4) + { + frameSize_ = static_cast(buffer->readInt32()); + state_ = kExpectFrame; + } + else + { + more = false; + } + } + else if (state_ == kExpectFrame) + { + if (buffer->readableBytes() >= frameSize_) + { + uint8_t* buf = reinterpret_cast((const_cast(buffer->peek()))); + + inputTransport_->resetBuffer(buf, frameSize_, TMemoryBuffer::COPY); + outputTransport_->resetBuffer(); + outputTransport_->getWritePtr(4); + outputTransport_->wroteBytes(4); + + if (server_->isWorkerThreadPoolProcessing()) + { + server_->workerThreadPool().run(boost::bind(&ThriftConnection::process, this)); + } + else + { + process(); + } + + buffer->retrieve(frameSize_); + state_ = kExpectFrameSize; + } + else + { + more = false; + } + } + } +} + +void ThriftConnection::process() +{ + try + { + processor_->process(inputProtocol_, outputProtocol_, NULL); + + uint8_t* buf; + uint32_t size; + outputTransport_->getBuffer(&buf, &size); + + assert(size >= 4); + uint32_t frameSize = static_cast(htonl(size - 4)); + memcpy(buf, &frameSize, 4); + + conn_->send(buf, size); + } catch (const TTransportException& ex) + { + LOG_ERROR << "ThriftServer TTransportException: " << ex.what(); + close(); + } catch (const std::exception& ex) + { + LOG_ERROR << "ThriftServer std::exception: " << ex.what(); + close(); + } catch (...) + { + LOG_ERROR << "ThriftServer unknown exception"; + close(); + } +} + +void ThriftConnection::close() +{ + nullTransport_->close(); + factoryInputTransport_->close(); + factoryOutputTransport_->close(); +} diff --git a/examples/thrift/ThriftConnection.h b/examples/thrift/ThriftConnection.h new file mode 100644 index 000000000..6a3b0b23b --- /dev/null +++ b/examples/thrift/ThriftConnection.h @@ -0,0 +1,66 @@ +#ifndef MUDUO_EXAMPLES_THRIFT_THRIFTCONNECTION_H +#define MUDUO_EXAMPLES_THRIFT_THRIFTCONNECTION_H + +#include +#include + +#include + +#include +#include +#include + +using apache::thrift::TProcessor; +using apache::thrift::protocol::TProtocol; +using apache::thrift::transport::TMemoryBuffer; +using apache::thrift::transport::TNullTransport; +using apache::thrift::transport::TTransport; +using apache::thrift::transport::TTransportException; + +class ThriftServer; + +class ThriftConnection : boost::noncopyable, + public boost::enable_shared_from_this +{ + public: + enum State + { + kExpectFrameSize, + kExpectFrame + }; + + ThriftConnection(ThriftServer* server, const muduo::net::TcpConnectionPtr& conn); + + private: + void onMessage(const muduo::net::TcpConnectionPtr& conn, + muduo::net::Buffer* buffer, + muduo::Timestamp receiveTime); + + void process(); + + void close(); + + private: + ThriftServer* server_; + muduo::net::TcpConnectionPtr conn_; + + boost::shared_ptr nullTransport_; + + boost::shared_ptr inputTransport_; + boost::shared_ptr outputTransport_; + + boost::shared_ptr factoryInputTransport_; + boost::shared_ptr factoryOutputTransport_; + + boost::shared_ptr inputProtocol_; + boost::shared_ptr outputProtocol_; + + boost::shared_ptr processor_; + + enum State state_; + uint32_t frameSize_; +}; // ThriftConnection + +typedef boost::shared_ptr ThriftConnectionPtr; + +#endif // MUDUO_EXAMPLES_THRIFT_THRIFTCONNECTION_H diff --git a/examples/thrift/ThriftServer.cc b/examples/thrift/ThriftServer.cc new file mode 100644 index 000000000..9d8a085db --- /dev/null +++ b/examples/thrift/ThriftServer.cc @@ -0,0 +1,53 @@ +#include "ThriftServer.h" + +#include + +#include + +using namespace muduo; +using namespace muduo::net; + +ThriftServer::~ThriftServer() +{ +} + +void ThriftServer::serve() +{ + start(); +} + +void ThriftServer::start() +{ + if (numWorkerThreads_ > 0) + { + workerThreadPool_.start(numWorkerThreads_); + } + server_.start(); +} + +void ThriftServer::stop() +{ + if (numWorkerThreads_ > 0) + { + workerThreadPool_.stop(); + } + server_.getLoop()->runAfter(3.0, boost::bind(&EventLoop::quit, + server_.getLoop())); +} + +void ThriftServer::onConnection(const TcpConnectionPtr& conn) +{ + if (conn->connected()) + { + ThriftConnectionPtr ptr(new ThriftConnection(this, conn)); + MutexLockGuard lock(mutex_); + assert(conns_.find(conn->name()) == conns_.end()); + conns_[conn->name()] = ptr; + } + else + { + MutexLockGuard lock(mutex_); + assert(conns_.find(conn->name()) != conns_.end()); + conns_.erase(conn->name()); + } +} diff --git a/examples/thrift/ThriftServer.h b/examples/thrift/ThriftServer.h new file mode 100644 index 000000000..6146163ab --- /dev/null +++ b/examples/thrift/ThriftServer.h @@ -0,0 +1,222 @@ +#ifndef MUDUO_EXAMPLES_THRIFT_THRIFTSERVER_H +#define MUDUO_EXAMPLES_THRIFT_THRIFTSERVER_H + +#include + +#include +#include + +#include +#include + +#include + +#include "ThriftConnection.h" + +using apache::thrift::TProcessor; +using apache::thrift::TProcessorFactory; +using apache::thrift::protocol::TProtocolFactory; +using apache::thrift::server::TServer; +using apache::thrift::transport::TTransportFactory; + +class ThriftServer : boost::noncopyable, + public TServer +{ + public: + template + ThriftServer(const boost::shared_ptr& processorFactory, + muduo::net::EventLoop* eventloop, + const muduo::net::InetAddress& addr, + const muduo::string& name, + THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) + : TServer(processorFactory), + server_(eventloop, addr, name), + numWorkerThreads_(0), + workerThreadPool_(name + muduo::string("WorkerThreadPool")) + { + server_.setConnectionCallback(boost::bind(&ThriftServer::onConnection, + this, _1)); + } + + template + ThriftServer(const boost::shared_ptr& processor, + muduo::net::EventLoop* eventloop, + const muduo::net::InetAddress& addr, + const muduo::string& name, + THRIFT_OVERLOAD_IF(Processor, TProcessor)) + : TServer(processor), + server_(eventloop, addr, name), + numWorkerThreads_(0), + workerThreadPool_(name + muduo::string("WorkerThreadPool")) + { + server_.setConnectionCallback(boost::bind(&ThriftServer::onConnection, + this, _1)); + } + + template + ThriftServer(const boost::shared_ptr& processorFactory, + const boost::shared_ptr& protocolFactory, + muduo::net::EventLoop* eventloop, + const muduo::net::InetAddress& addr, + const muduo::string& name, + THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) + : TServer(processorFactory), + server_(eventloop, addr, name), + numWorkerThreads_(0), + workerThreadPool_(name + muduo::string("WorkerThreadPool")) + { + server_.setConnectionCallback(boost::bind(&ThriftServer::onConnection, + this, _1)); + setInputProtocolFactory(protocolFactory); + setOutputProtocolFactory(protocolFactory); + } + + template + ThriftServer(const boost::shared_ptr& processor, + const boost::shared_ptr& protocolFactory, + muduo::net::EventLoop* eventloop, + const muduo::net::InetAddress& addr, + const muduo::string& name, + THRIFT_OVERLOAD_IF(Processor, TProcessor)) + : TServer(processor), + server_(eventloop, addr, name), + numWorkerThreads_(0), + workerThreadPool_(name + muduo::string("WorkerThreadPool")) + { + server_.setConnectionCallback(boost::bind(&ThriftServer::onConnection, + this, _1)); + setInputProtocolFactory(protocolFactory); + setOutputProtocolFactory(protocolFactory); + } + + template + ThriftServer(const boost::shared_ptr& processorFactory, + const boost::shared_ptr& transportFactory, + const boost::shared_ptr& protocolFactory, + muduo::net::EventLoop* eventloop, + const muduo::net::InetAddress& addr, + const muduo::string& name, + THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) + : TServer(processorFactory), + server_(eventloop, addr, name), + numWorkerThreads_(0), + workerThreadPool_(name + muduo::string("WorkerThreadPool")) + { + server_.setConnectionCallback(boost::bind(&ThriftServer::onConnection, + this, _1)); + setInputTransportFactory(transportFactory); + setOutputTransportFactory(transportFactory); + setInputProtocolFactory(protocolFactory); + setOutputProtocolFactory(protocolFactory); + } + + template + ThriftServer(const boost::shared_ptr& processor, + const boost::shared_ptr& transportFactory, + const boost::shared_ptr& protocolFactory, + muduo::net::EventLoop* eventloop, + const muduo::net::InetAddress& addr, + const muduo::string& name, + THRIFT_OVERLOAD_IF(Processor, TProcessor)) + : TServer(processor), + server_(eventloop, addr, name), + numWorkerThreads_(0), + workerThreadPool_(name + muduo::string("WorkerThreadPool")) + { + server_.setConnectionCallback(boost::bind(&ThriftServer::onConnection, + this, _1)); + setInputTransportFactory(transportFactory); + setOutputTransportFactory(transportFactory); + setInputProtocolFactory(protocolFactory); + setOutputProtocolFactory(protocolFactory); + } + + template + ThriftServer(const boost::shared_ptr& processorFactory, + const boost::shared_ptr& inputTransportFactory, + const boost::shared_ptr& outputTransportFactory, + const boost::shared_ptr& inputProtocolFactory, + const boost::shared_ptr& outputProtocolFactory, + muduo::net::EventLoop* eventloop, + const muduo::net::InetAddress& addr, + const muduo::string& name, + THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) + : TServer(processorFactory), + server_(eventloop, addr, name), + numWorkerThreads_(0), + workerThreadPool_(name + muduo::string("WorkerThreadPool")) + { + server_.setConnectionCallback(boost::bind(&ThriftServer::onConnection, + this, _1)); + setInputTransportFactory(inputTransportFactory); + setOutputTransportFactory(outputTransportFactory); + setInputProtocolFactory(inputProtocolFactory); + setOutputProtocolFactory(outputProtocolFactory); + } + + template + ThriftServer(const boost::shared_ptr& processor, + const boost::shared_ptr& inputTransportFactory, + const boost::shared_ptr& outputTransportFactory, + const boost::shared_ptr& inputProtocolFactory, + const boost::shared_ptr& outputProtocolFactory, + muduo::net::EventLoop* eventloop, + const muduo::net::InetAddress& addr, + const muduo::string& name, + THRIFT_OVERLOAD_IF(Processor, TProcessor)) + : TServer(processor), + server_(eventloop, addr, name), + numWorkerThreads_(0), + workerThreadPool_(name + muduo::string("WorkerThreadPool")) + { + server_.setConnectionCallback(boost::bind(&ThriftServer::onConnection, + this, _1)); + setInputTransportFactory(inputTransportFactory); + setOutputTransportFactory(outputTransportFactory); + setInputProtocolFactory(inputProtocolFactory); + setOutputProtocolFactory(outputProtocolFactory); + } + + virtual ~ThriftServer(); + + void serve(); + + void start(); + + void stop(); + + muduo::ThreadPool& workerThreadPool() + { + return workerThreadPool_; + } + + bool isWorkerThreadPoolProcessing() const + { + return numWorkerThreads_ != 0; + } + + void setThreadNum(int numThreads) + { + server_.setThreadNum(numThreads); + } + + void setWorkerThreadNum(int numWorkerThreads) + { + assert(numWorkerThreads > 0); + numWorkerThreads_ = numWorkerThreads; + } + + private: + friend class ThriftConnection; + + void onConnection(const muduo::net::TcpConnectionPtr& conn); + + private: + muduo::net::TcpServer server_; + int numWorkerThreads_; + muduo::ThreadPool workerThreadPool_; + muduo::MutexLock mutex_; + std::map conns_; +}; // ThriftServer + +#endif // MUDUO_EXAMPLES_THRIFT_THRIFTSERVER_H diff --git a/examples/thrift/tests/CMakeLists.txt b/examples/thrift/tests/CMakeLists.txt new file mode 100644 index 000000000..693c8b4c2 --- /dev/null +++ b/examples/thrift/tests/CMakeLists.txt @@ -0,0 +1,2 @@ +add_subdirectory(echo) +add_subdirectory(ping) diff --git a/examples/thrift/tests/echo/CMakeLists.txt b/examples/thrift/tests/echo/CMakeLists.txt new file mode 100644 index 000000000..1bcddd293 --- /dev/null +++ b/examples/thrift/tests/echo/CMakeLists.txt @@ -0,0 +1,16 @@ +include_directories(gen-cpp) +set(ECHO_THRIFT echo.thrift) +execute_process(COMMAND ${THRIFT_COMPILER} --gen cpp ${ECHO_THRIFT} + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) +execute_process(COMMAND ${THRIFT_COMPILER} --gen py ${ECHO_THRIFT} + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) +set(ECHO_THRIFT_SRCS + gen-cpp/echo_constants.cpp + gen-cpp/echo_types.cpp + gen-cpp/Echo.cpp + ) +set(ECHO_SRCS + EchoServer.cc + ) +add_executable(muduo_thrift_echo ${ECHO_SRCS} ${ECHO_THRIFT_SRCS}) +target_link_libraries(muduo_thrift_echo muduo_thrift) diff --git a/examples/thrift/tests/echo/EchoServer.cc b/examples/thrift/tests/echo/EchoServer.cc new file mode 100644 index 000000000..9d7755002 --- /dev/null +++ b/examples/thrift/tests/echo/EchoServer.cc @@ -0,0 +1,51 @@ +#include + +#include +#include + +#include "ThriftServer.h" + +#include "Echo.h" + +using namespace muduo; +using namespace muduo::net; + +using namespace echo; + +class EchoHandler : virtual public EchoIf +{ + public: + EchoHandler() + { + } + + void echo(std::string& str, const std::string& s) + { + LOG_INFO << "EchoHandler::echo:" << s; + str = s; + } + +}; + +int NumCPU() +{ + return static_cast(sysconf(_SC_NPROCESSORS_ONLN)); +} + +int main(int argc, char **argv) +{ + EventLoop eventloop; + InetAddress addr("127.0.0.1", 9090); + string name("EchoServer"); + + boost::shared_ptr handler(new EchoHandler()); + boost::shared_ptr processor(new EchoProcessor(handler)); + + ThriftServer server(processor, &eventloop, addr, name); + server.setWorkerThreadNum(NumCPU() * 2); + server.start(); + eventloop.loop(); + + return 0; +} + diff --git a/examples/thrift/tests/echo/echo.thrift b/examples/thrift/tests/echo/echo.thrift new file mode 100644 index 000000000..af714d5b7 --- /dev/null +++ b/examples/thrift/tests/echo/echo.thrift @@ -0,0 +1,8 @@ +namespace cpp echo +namespace py echo + +service Echo +{ + string echo(1: string arg); +} + diff --git a/examples/thrift/tests/echo/echoclient.py b/examples/thrift/tests/echo/echoclient.py new file mode 100644 index 000000000..e7b3b008c --- /dev/null +++ b/examples/thrift/tests/echo/echoclient.py @@ -0,0 +1,28 @@ +import sys +sys.path.append('gen-py') + +from thrift.transport import TSocket +from thrift.transport import TTransport +from thrift.protocol import TBinaryProtocol + +from echo import Echo + + +def echo(s): + transport = TSocket.TSocket('127.0.0.1', 9090) + tranport = TTransport.TFramedTransport(transport) + protocol = TBinaryProtocol.TBinaryProtocol(tranport) + client = Echo.Client(protocol) + tranport.open() + s = client.echo(s) + tranport.close() + + return s + + +def main(): + print(echo('42')) + + +if __name__ == '__main__': + main() diff --git a/examples/thrift/tests/ping/CMakeLists.txt b/examples/thrift/tests/ping/CMakeLists.txt new file mode 100644 index 000000000..ba1764f46 --- /dev/null +++ b/examples/thrift/tests/ping/CMakeLists.txt @@ -0,0 +1,16 @@ +include_directories(gen-cpp) +set(PING_THRIFT ping.thrift) +execute_process(COMMAND ${THRIFT_COMPILER} --gen cpp ${PING_THRIFT} + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) +execute_process(COMMAND ${THRIFT_COMPILER} --gen py ${PING_THRIFT} + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) +set(PING_THRIFT_SRCS + gen-cpp/ping_constants.cpp + gen-cpp/ping_types.cpp + gen-cpp/Ping.cpp + ) +set(PING_SRCS + PingServer.cc + ) +add_executable(muduo_thrift_ping ${PING_SRCS} ${PING_THRIFT_SRCS}) +target_link_libraries(muduo_thrift_ping muduo_thrift) diff --git a/examples/thrift/tests/ping/PingServer.cc b/examples/thrift/tests/ping/PingServer.cc new file mode 100644 index 000000000..abff35e24 --- /dev/null +++ b/examples/thrift/tests/ping/PingServer.cc @@ -0,0 +1,47 @@ +#include +#include + +#include + +#include "ThriftServer.h" + +#include "Ping.h" + +using namespace muduo; +using namespace muduo::net; + +using apache::thrift::protocol::TCompactProtocolFactory; + +using namespace ping; + +class PingHandler : virtual public PingIf +{ + public: + PingHandler() + { + } + + void ping() + { + LOG_INFO << "ping"; + } + +}; + +int main(int argc, char **argv) +{ + EventLoop eventloop; + InetAddress addr("127.0.0.1", 9090); + string name("PingServer"); + + boost::shared_ptr handler(new PingHandler()); + boost::shared_ptr processor(new PingProcessor(handler)); + boost::shared_ptr protcolFactory(new TCompactProtocolFactory()); + + ThriftServer server(processor, protcolFactory, &eventloop, addr, name); + server.start(); + eventloop.loop(); + + return 0; +} + diff --git a/examples/thrift/tests/ping/ping.thrift b/examples/thrift/tests/ping/ping.thrift new file mode 100644 index 000000000..e38a0e821 --- /dev/null +++ b/examples/thrift/tests/ping/ping.thrift @@ -0,0 +1,8 @@ +namespace cpp ping +namespace py ping + +service Ping +{ + void ping(); +} + diff --git a/examples/thrift/tests/ping/pingclient.py b/examples/thrift/tests/ping/pingclient.py new file mode 100644 index 000000000..166bc481d --- /dev/null +++ b/examples/thrift/tests/ping/pingclient.py @@ -0,0 +1,26 @@ +import sys +sys.path.append('gen-py') + +from thrift.transport import TSocket +from thrift.transport import TTransport +from thrift.protocol import TCompactProtocol + +from ping import Ping + + +def ping(): + transport = TSocket.TSocket('127.0.0.1', 9090) + tranport = TTransport.TFramedTransport(transport) + protocol = TCompactProtocol.TCompactProtocol(tranport) + client = Ping.Client(protocol) + tranport.open() + client.ping() + tranport.close() + + +def main(): + ping() + + +if __name__ == '__main__': + main()