From 0afc46208c2c9ddc73b61d9d0dbc0d5467193fd5 Mon Sep 17 00:00:00 2001 From: Tobias Oberstein Date: Sat, 29 Mar 2014 20:08:07 +0100 Subject: [PATCH] cleanup --- include/autobahn.hpp | 76 +++++++++++++++------------ include/autobahn.ipp | 120 +++++++++++++++++-------------------------- test/test16.cpp | 2 + 3 files changed, 92 insertions(+), 106 deletions(-) diff --git a/include/autobahn.hpp b/include/autobahn.hpp index d83dfbab..59873988 100644 --- a/include/autobahn.hpp +++ b/include/autobahn.hpp @@ -29,11 +29,8 @@ #include #include - #include -#include - // http://stackoverflow.com/questions/22597948/using-boostfuture-with-then-continuations/ #define BOOST_THREAD_PROVIDES_FUTURE #define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION @@ -41,7 +38,7 @@ #include //#include - +#include #include @@ -60,44 +57,44 @@ namespace autobahn { /// A pair of ::anyvec and ::anymap. typedef std::pair anyvecmap; + /// Handler type for use with session::subscribe(const std::string&, handler_t) typedef std::function handler_t; -// typedef boost::any (*endpoint_t) (const anyvec&, const anymap&); - /// Endpoint type for use with session::provide(const std::string&, endpoint_t) typedef std::function endpoint_t; - /// Endpoint type for use with session::provide(const std::string&, endpoint_v_t) + /// Endpoint type for use with session::provide_v(const std::string&, endpoint_v_t) typedef std::function endpoint_v_t; - /// Endpoint type for use with session::provide(const std::string&, endpoint_m_t) + /// Endpoint type for use with session::provide_m(const std::string&, endpoint_m_t) typedef std::function endpoint_m_t; - /// Endpoint type for use with session::provide(const std::string&, endpoint_vm_t) + /// Endpoint type for use with session::provide_vm(const std::string&, endpoint_vm_t) typedef std::function endpoint_vm_t; - /// Endpoint type for use with session::provide(const std::string&, endpointf_t) - typedef std::function(const anyvec&, const anymap&)> endpointf_t; + /// Endpoint type for use with session::provide(const std::string&, endpoint_ft) + typedef std::function(const anyvec&, const anymap&)> endpoint_f_t; - /// Endpoint type for use with session::provide(const std::string&, endpointf_v_t) - typedef std::function(const anyvec&, const anymap&)> endpointf_v_t; + /// Endpoint type for use with session::provide_fv(const std::string&, endpoint_fv_t) + typedef std::function(const anyvec&, const anymap&)> endpoint_fv_t; - /// Endpoint type for use with session::provide(const std::string&, endpointf_m_t) - typedef std::function(const anyvec&, const anymap&)> endpointf_m_t; + /// Endpoint type for use with session::provide_fm(const std::string&, endpoint_fm_t) + typedef std::function(const anyvec&, const anymap&)> endpoint_fm_t; - /// Endpoint type for use with session::provide(const std::string&, endpointf_vm_t) - typedef std::function(const anyvec&, const anymap&)> endpointf_vm_t; + /// Endpoint type for use with session::provide_fvm(const std::string&, endpoint_fvm_t) + typedef std::function(const anyvec&, const anymap&)> endpoint_fvm_t; /// Represents a procedure registration. struct registration { - uint64_t m_id; + registration() : id(0) {}; + registration(uint64_t id) : id(id) {}; + uint64_t id; }; - /// Represents a topic subscription. struct subscription { subscription() : id(0) {}; @@ -105,6 +102,13 @@ namespace autobahn { uint64_t id; }; + /// Represents an event publication (for acknowledged publications). + struct publication { + publication() : id(0) {}; + publication(uint64_t id) : id(id) {}; + uint64_t id; + }; + /*! * A WAMP session. @@ -183,6 +187,16 @@ namespace autobahn { void publish(const std::string& topic, const anyvec& args, const anymap& kwargs); + /*! + * Subscribe a handler to a topic to receive events. + * + * \param topic The URI of the topic to subscribe to. + * \param handler The handler that will receive events under the subscription. + * \return A future that resolves to a autobahn::subscription + */ + inline + boost::future subscribe(const std::string& topic, handler_t handler); + /*! * Calls a remote procedure with no arguments. @@ -215,18 +229,6 @@ namespace autobahn { boost::future call(const std::string& procedure, const anyvec& args, const anymap& kwargs); - /*! - * Subscribe a handler to a topic to receive events. - * - * \param topic The URI of the topic to subscribe to. - * \param handler The handler that will receive events under the subscription. - * \return A future that resolves to a autobahn::subscription - */ - inline - boost::future subscribe(const std::string& topic, handler_t handler); - - - /*! * Register an endpoint as a procedure that can be called remotely. * @@ -238,7 +240,17 @@ namespace autobahn { inline boost::future provide_v(const std::string& procedure, endpoint_v_t endpoint); - inline boost::future providef_vm(const std::string& procedure, endpointf_vm_t endpoint); + inline boost::future provide_m(const std::string& procedure, endpoint_m_t endpoint); + + inline boost::future provide_vm(const std::string& procedure, endpoint_vm_t endpoint); + + inline boost::future provide_f(const std::string& procedure, endpoint_f_t endpoint); + + inline boost::future provide_fv(const std::string& procedure, endpoint_fv_t endpoint); + + inline boost::future provide_fm(const std::string& procedure, endpoint_fm_t endpoint); + + inline boost::future provide_fvm(const std::string& procedure, endpoint_fvm_t endpoint); private: diff --git a/include/autobahn.ipp b/include/autobahn.ipp index 04a3d8b4..055364dd 100644 --- a/include/autobahn.ipp +++ b/include/autobahn.ipp @@ -179,8 +179,38 @@ namespace autobahn { template - boost::future session::providef_vm(const std::string& procedure, endpointf_vm_t endpoint) { - return _provide(procedure, static_cast (endpoint)); + boost::future session::provide_m(const std::string& procedure, endpoint_m_t endpoint) { + return _provide(procedure, static_cast (endpoint)); + } + + + template + boost::future session::provide_vm(const std::string& procedure, endpoint_vm_t endpoint) { + return _provide(procedure, static_cast (endpoint)); + } + + + template + boost::future session::provide_f(const std::string& procedure, endpoint_f_t endpoint) { + return _provide(procedure, static_cast (endpoint)); + } + + + template + boost::future session::provide_fv(const std::string& procedure, endpoint_fv_t endpoint) { + return _provide(procedure, static_cast (endpoint)); + } + + + template + boost::future session::provide_fm(const std::string& procedure, endpoint_fm_t endpoint) { + return _provide(procedure, static_cast (endpoint)); + } + + + template + boost::future session::provide_fvm(const std::string& procedure, endpoint_fvm_t endpoint) { + return _provide(procedure, static_cast (endpoint)); } @@ -188,14 +218,15 @@ namespace autobahn { template boost::future session::_provide(const std::string& procedure, E endpoint) { - // [REGISTER, Request|id, Options|dict, Procedure|uri] - - std::cerr << "OOOOOOOOOOO " << typeid(endpoint).name() << std::endl; - std::cerr << "OOOOOOOOOOO " << typeid(E()).name() << std::endl; + if (!m_session_id) { + throw no_session_error(); + } m_request_id += 1; m_register_requests[m_request_id] = register_request_t(endpoint); + // [REGISTER, Request|id, Options|dict, Procedure|uri] + m_packer.pack_array(4); m_packer.pack(static_cast (msg_code::REGISTER)); m_packer.pack(m_request_id); @@ -434,37 +465,10 @@ namespace autobahn { m_packer.pack(val); } else { - //std::cerr << "? "; + std::cerr << "Warning: don't know how to pack type " << value.type().name() << std::endl; } } -/* - template - boost::future session::provide(const std::string& procedure, E endpoint) { - - // [REGISTER, Request|id, Options|dict, Procedure|uri] - - m_request_id += 1; - m_register_requests[m_request_id] = register_request_t(endpoint); - - m_packer.pack_array(4); - m_packer.pack(MSG_CODE_REGISTER); - m_packer.pack(m_request_id); - m_packer.pack_map(0); - m_packer.pack(procedure); - send(); - - return m_register_requests[m_request_id].m_res.get_future(); - } -*/ -/* - template - boost::future session::provide(const std::string& procedure, endpoint_t endpoint); -*/ -/* - template - boost::future session::provide(const std::string& procedure, endpoint_v_t endpoint); -*/ template void session::process_welcome(const wamp_msg_t& msg) { @@ -592,8 +596,6 @@ namespace autobahn { // [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict, CALL.Arguments|list] // [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict, CALL.Arguments|list, CALL.ArgumentsKw|dict] - std::cerr << "==> 1" << std::endl; - if (msg.size() != 4 && msg.size() != 5 && msg.size() != 6) { throw ProtocolError("invalid INVOCATION message structure - length must be 4, 5 or 6"); } @@ -603,8 +605,6 @@ namespace autobahn { } uint64_t request_id = msg[1].as(); - std::cerr << "==> 2" << std::endl; - if (msg[2].type != msgpack::type::POSITIVE_INTEGER) { throw ProtocolError("invalid INVOCATION message structure - INVOCATION.Registration must be an integer"); } @@ -612,8 +612,6 @@ namespace autobahn { endpoints_t::iterator endpoint = m_endpoints.find(registration_id); - std::cerr << "==> 3" << std::endl; - if (endpoint != m_endpoints.end()) { if (msg[3].type != msgpack::type::MAP) { @@ -623,8 +621,6 @@ namespace autobahn { anyvec args; anymap kwargs; - std::cerr << "==> 4" << std::endl; - if (msg.size() > 4) { if (msg[4].type != msgpack::type::ARRAY) { @@ -642,7 +638,9 @@ namespace autobahn { } } - std::cerr << "==> 5" << std::endl; + // [YIELD, INVOCATION.Request|id, Options|dict] + // [YIELD, INVOCATION.Request|id, Options|dict, Arguments|list] + // [YIELD, INVOCATION.Request|id, Options|dict, Arguments|list, ArgumentsKw|dict] try { if ((endpoint->second).type() == typeid(endpoint_t)) { @@ -668,20 +666,14 @@ namespace autobahn { pack_any(res); send(); - } else if ((endpoint->second).type() == typeid(endpointf_vm_t)) { - - boost::future f_res = ( boost::any_cast(endpoint->second) )(args, kwargs); + } else if ((endpoint->second).type() == typeid(endpoint_fvm_t)) { - std::cerr << "++ 1" << std::endl; + boost::future f_res = ( boost::any_cast(endpoint->second) )(args, kwargs); auto done = f_res.then([&](decltype(f_res) f) { - std::cerr << "++ 2 " << typeid(f).name() << std::endl; - anyvecmap res = f.get(); - std::cerr << "++ 3" << std::endl; - m_packer.pack_array(5); m_packer.pack(MSG_CODE_YIELD); m_packer.pack(request_id); @@ -689,13 +681,9 @@ namespace autobahn { pack_any(res.first); pack_any(res.second); send(); - - std::cerr << "++ 4" << std::endl; }); - std::cerr << "++ 5" << std::endl; - done.get(); - std::cerr << "++ 6" << std::endl; + done.wait(); } else { // FIXME @@ -703,20 +691,13 @@ namespace autobahn { std::cerr << typeid(endpoint_t).name() << std::endl; std::cerr << ((endpoint->second).type()).name() << std::endl; } -// boost::any res = (endpoint->second)(args, kwargs); -// boost::any res = (*(endpoint->second))(args, kwargs); - - // [YIELD, INVOCATION.Request|id, Options|dict] - // [YIELD, INVOCATION.Request|id, Options|dict, Arguments|list] - // [YIELD, INVOCATION.Request|id, Options|dict, Arguments|list, ArgumentsKw|dict] } catch (...) { + // FIXME: send ERROR std::cerr << "INVOCATION failed" << std::endl; } - std::cerr << "==> 6" << std::endl; - } else { throw ProtocolError("bogus INVOCATION message for non-registered registration ID"); } @@ -918,18 +899,9 @@ namespace autobahn { uint64_t registration_id = msg[2].as(); - //std::cerr << "REGxx" << register_request->second.m_endpoint << std::endl; - - std::cerr << "XXX 2a " << (register_request->second).m_endpoint.type().name() << std::endl; - m_endpoints[registration_id] = register_request->second.m_endpoint; - std::cerr << "XXX 2b " << (m_endpoints[registration_id]).type().name() << std::endl; - - registration reg; - reg.m_id = registration_id; - - register_request->second.m_res.set_value(reg); + register_request->second.m_res.set_value(registration(registration_id)); } else { throw ProtocolError("bogus REGISTERED message for non-pending request ID"); diff --git a/test/test16.cpp b/test/test16.cpp index 427b444c..584082e4 100644 --- a/test/test16.cpp +++ b/test/test16.cpp @@ -88,6 +88,8 @@ int main () { cerr << "Subscribed with subscription ID " << sub.get().id << endl; }); + f1.wait(); + }); } else {