Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Oberstein committed Mar 29, 2014
1 parent 03dc45a commit 0afc462
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 106 deletions.
76 changes: 44 additions & 32 deletions include/autobahn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,16 @@
#include <map>
#include <functional>


#include <msgpack.hpp>

#include <boost/any.hpp>

// http://stackoverflow.com/questions/22597948/using-boostfuture-with-then-continuations/
#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <boost/thread/future.hpp>
//#include <future>


#include <boost/any.hpp>
#include <boost/asio.hpp>


Expand All @@ -60,51 +57,58 @@ namespace autobahn {
/// A pair of ::anyvec and ::anymap.
typedef std::pair<anyvec, anymap> anyvecmap;


/// Handler type for use with session::subscribe(const std::string&, handler_t)
typedef std::function<void(const anyvec&, const anymap&)> handler_t;


// typedef boost::any (*endpoint_t) (const anyvec&, const anymap&);

/// Endpoint type for use with session::provide(const std::string&, endpoint_t)
typedef std::function<boost::any(const anyvec&, const anymap&)> endpoint_t;

/// Endpoint type for use with session::provide(const std::string&, endpoint_v_t)
/// Endpoint type for use with session::provide_v(const std::string&, endpoint_v_t)
typedef std::function<anyvec(const anyvec&, const anymap&)> endpoint_v_t;

/// Endpoint type for use with session::provide(const std::string&, endpoint_m_t)
/// Endpoint type for use with session::provide_m(const std::string&, endpoint_m_t)
typedef std::function<anymap(const anyvec&, const anymap&)> endpoint_m_t;

/// Endpoint type for use with session::provide(const std::string&, endpoint_vm_t)
/// Endpoint type for use with session::provide_vm(const std::string&, endpoint_vm_t)
typedef std::function<anyvecmap(const anyvec&, const anymap&)> endpoint_vm_t;


/// Endpoint type for use with session::provide(const std::string&, endpointf_t)
typedef std::function<boost::future<boost::any>(const anyvec&, const anymap&)> endpointf_t;
/// Endpoint type for use with session::provide(const std::string&, endpoint_ft)
typedef std::function<boost::future<boost::any>(const anyvec&, const anymap&)> endpoint_f_t;

/// Endpoint type for use with session::provide(const std::string&, endpointf_v_t)
typedef std::function<boost::future<anyvec>(const anyvec&, const anymap&)> endpointf_v_t;
/// Endpoint type for use with session::provide_fv(const std::string&, endpoint_fv_t)
typedef std::function<boost::future<anyvec>(const anyvec&, const anymap&)> endpoint_fv_t;

/// Endpoint type for use with session::provide(const std::string&, endpointf_m_t)
typedef std::function<boost::future<anymap>(const anyvec&, const anymap&)> endpointf_m_t;
/// Endpoint type for use with session::provide_fm(const std::string&, endpoint_fm_t)
typedef std::function<boost::future<anymap>(const anyvec&, const anymap&)> endpoint_fm_t;

/// Endpoint type for use with session::provide(const std::string&, endpointf_vm_t)
typedef std::function<boost::future<anyvecmap>(const anyvec&, const anymap&)> endpointf_vm_t;
/// Endpoint type for use with session::provide_fvm(const std::string&, endpoint_fvm_t)
typedef std::function<boost::future<anyvecmap>(const anyvec&, const anymap&)> endpoint_fvm_t;


/// Represents a procedure registration.
struct registration {
uint64_t m_id;
registration() : id(0) {};
registration(uint64_t id) : id(id) {};
uint64_t id;
};


/// Represents a topic subscription.
struct subscription {
subscription() : id(0) {};
subscription(uint64_t id) : id(id) {};
uint64_t id;
};

/// Represents an event publication (for acknowledged publications).
struct publication {
publication() : id(0) {};
publication(uint64_t id) : id(id) {};
uint64_t id;
};


/*!
* A WAMP session.
Expand Down Expand Up @@ -183,6 +187,16 @@ namespace autobahn {
void publish(const std::string& topic, const anyvec& args, const anymap& kwargs);


/*!
* Subscribe a handler to a topic to receive events.
*
* \param topic The URI of the topic to subscribe to.
* \param handler The handler that will receive events under the subscription.
* \return A future that resolves to a autobahn::subscription
*/
inline
boost::future<subscription> subscribe(const std::string& topic, handler_t handler);


/*!
* Calls a remote procedure with no arguments.
Expand Down Expand Up @@ -215,18 +229,6 @@ namespace autobahn {
boost::future<boost::any> call(const std::string& procedure, const anyvec& args, const anymap& kwargs);


/*!
* Subscribe a handler to a topic to receive events.
*
* \param topic The URI of the topic to subscribe to.
* \param handler The handler that will receive events under the subscription.
* \return A future that resolves to a autobahn::subscription
*/
inline
boost::future<subscription> subscribe(const std::string& topic, handler_t handler);



/*!
* Register an endpoint as a procedure that can be called remotely.
*
Expand All @@ -238,7 +240,17 @@ namespace autobahn {

inline boost::future<registration> provide_v(const std::string& procedure, endpoint_v_t endpoint);

inline boost::future<registration> providef_vm(const std::string& procedure, endpointf_vm_t endpoint);
inline boost::future<registration> provide_m(const std::string& procedure, endpoint_m_t endpoint);

inline boost::future<registration> provide_vm(const std::string& procedure, endpoint_vm_t endpoint);

inline boost::future<registration> provide_f(const std::string& procedure, endpoint_f_t endpoint);

inline boost::future<registration> provide_fv(const std::string& procedure, endpoint_fv_t endpoint);

inline boost::future<registration> provide_fm(const std::string& procedure, endpoint_fm_t endpoint);

inline boost::future<registration> provide_fvm(const std::string& procedure, endpoint_fvm_t endpoint);

private:

Expand Down
120 changes: 46 additions & 74 deletions include/autobahn.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,54 @@ namespace autobahn {


template<typename IStream, typename OStream>
boost::future<registration> session<IStream, OStream>::providef_vm(const std::string& procedure, endpointf_vm_t endpoint) {
return _provide(procedure, static_cast<endpointf_vm_t> (endpoint));
boost::future<registration> session<IStream, OStream>::provide_m(const std::string& procedure, endpoint_m_t endpoint) {
return _provide(procedure, static_cast<endpoint_m_t> (endpoint));
}


template<typename IStream, typename OStream>
boost::future<registration> session<IStream, OStream>::provide_vm(const std::string& procedure, endpoint_vm_t endpoint) {
return _provide(procedure, static_cast<endpoint_vm_t> (endpoint));
}


template<typename IStream, typename OStream>
boost::future<registration> session<IStream, OStream>::provide_f(const std::string& procedure, endpoint_f_t endpoint) {
return _provide(procedure, static_cast<endpoint_f_t> (endpoint));
}


template<typename IStream, typename OStream>
boost::future<registration> session<IStream, OStream>::provide_fv(const std::string& procedure, endpoint_fv_t endpoint) {
return _provide(procedure, static_cast<endpoint_fv_t> (endpoint));
}


template<typename IStream, typename OStream>
boost::future<registration> session<IStream, OStream>::provide_fm(const std::string& procedure, endpoint_fm_t endpoint) {
return _provide(procedure, static_cast<endpoint_fm_t> (endpoint));
}


template<typename IStream, typename OStream>
boost::future<registration> session<IStream, OStream>::provide_fvm(const std::string& procedure, endpoint_fvm_t endpoint) {
return _provide(procedure, static_cast<endpoint_fvm_t> (endpoint));
}


template<typename IStream, typename OStream>
template<typename E>
boost::future<registration> session<IStream, OStream>::_provide(const std::string& procedure, E endpoint) {

// [REGISTER, Request|id, Options|dict, Procedure|uri]

std::cerr << "OOOOOOOOOOO " << typeid(endpoint).name() << std::endl;
std::cerr << "OOOOOOOOOOO " << typeid(E()).name() << std::endl;
if (!m_session_id) {
throw no_session_error();
}

m_request_id += 1;
m_register_requests[m_request_id] = register_request_t(endpoint);

// [REGISTER, Request|id, Options|dict, Procedure|uri]

m_packer.pack_array(4);
m_packer.pack(static_cast<int> (msg_code::REGISTER));
m_packer.pack(m_request_id);
Expand Down Expand Up @@ -434,37 +465,10 @@ namespace autobahn {
m_packer.pack(val);

} else {
//std::cerr << "? ";
std::cerr << "Warning: don't know how to pack type " << value.type().name() << std::endl;
}
}

/*
template<typename E>
boost::future<registration> session<IStream, OStream>::provide(const std::string& procedure, E endpoint) {
// [REGISTER, Request|id, Options|dict, Procedure|uri]
m_request_id += 1;
m_register_requests[m_request_id] = register_request_t(endpoint);
m_packer.pack_array(4);
m_packer.pack(MSG_CODE_REGISTER);
m_packer.pack(m_request_id);
m_packer.pack_map(0);
m_packer.pack(procedure);
send();
return m_register_requests[m_request_id].m_res.get_future();
}
*/
/*
template
boost::future<registration> session<IStream, OStream>::provide<endpoint_t>(const std::string& procedure, endpoint_t endpoint);
*/
/*
template
boost::future<registration> session<IStream, OStream>::provide<endpoint_v_t>(const std::string& procedure, endpoint_v_t endpoint);
*/

template<typename IStream, typename OStream>
void session<IStream, OStream>::process_welcome(const wamp_msg_t& msg) {
Expand Down Expand Up @@ -592,8 +596,6 @@ namespace autobahn {
// [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict, CALL.Arguments|list]
// [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict, CALL.Arguments|list, CALL.ArgumentsKw|dict]

std::cerr << "==> 1" << std::endl;

if (msg.size() != 4 && msg.size() != 5 && msg.size() != 6) {
throw ProtocolError("invalid INVOCATION message structure - length must be 4, 5 or 6");
}
Expand All @@ -603,17 +605,13 @@ namespace autobahn {
}
uint64_t request_id = msg[1].as<uint64_t>();

std::cerr << "==> 2" << std::endl;

if (msg[2].type != msgpack::type::POSITIVE_INTEGER) {
throw ProtocolError("invalid INVOCATION message structure - INVOCATION.Registration must be an integer");
}
uint64_t registration_id = msg[2].as<uint64_t>();

endpoints_t::iterator endpoint = m_endpoints.find(registration_id);

std::cerr << "==> 3" << std::endl;

if (endpoint != m_endpoints.end()) {

if (msg[3].type != msgpack::type::MAP) {
Expand All @@ -623,8 +621,6 @@ namespace autobahn {
anyvec args;
anymap kwargs;

std::cerr << "==> 4" << std::endl;

if (msg.size() > 4) {

if (msg[4].type != msgpack::type::ARRAY) {
Expand All @@ -642,7 +638,9 @@ namespace autobahn {
}
}

std::cerr << "==> 5" << std::endl;
// [YIELD, INVOCATION.Request|id, Options|dict]
// [YIELD, INVOCATION.Request|id, Options|dict, Arguments|list]
// [YIELD, INVOCATION.Request|id, Options|dict, Arguments|list, ArgumentsKw|dict]
try {

if ((endpoint->second).type() == typeid(endpoint_t)) {
Expand All @@ -668,55 +666,38 @@ namespace autobahn {
pack_any(res);
send();

} else if ((endpoint->second).type() == typeid(endpointf_vm_t)) {

boost::future<anyvecmap> f_res = ( boost::any_cast<endpointf_vm_t>(endpoint->second) )(args, kwargs);
} else if ((endpoint->second).type() == typeid(endpoint_fvm_t)) {

std::cerr << "++ 1" << std::endl;
boost::future<anyvecmap> f_res = ( boost::any_cast<endpoint_fvm_t>(endpoint->second) )(args, kwargs);

auto done = f_res.then([&](decltype(f_res) f) {

std::cerr << "++ 2 " << typeid(f).name() << std::endl;

anyvecmap res = f.get();

std::cerr << "++ 3" << std::endl;

m_packer.pack_array(5);
m_packer.pack(MSG_CODE_YIELD);
m_packer.pack(request_id);
m_packer.pack_map(0);
pack_any(res.first);
pack_any(res.second);
send();

std::cerr << "++ 4" << std::endl;
});

std::cerr << "++ 5" << std::endl;
done.get();
std::cerr << "++ 6" << std::endl;
done.wait();

} else {
// FIXME
std::cerr << "FIX ME INVOCATION " << std::endl;
std::cerr << typeid(endpoint_t).name() << std::endl;
std::cerr << ((endpoint->second).type()).name() << std::endl;
}
// boost::any res = (endpoint->second)(args, kwargs);
// boost::any res = (*(endpoint->second))(args, kwargs);

// [YIELD, INVOCATION.Request|id, Options|dict]
// [YIELD, INVOCATION.Request|id, Options|dict, Arguments|list]
// [YIELD, INVOCATION.Request|id, Options|dict, Arguments|list, ArgumentsKw|dict]

}
catch (...) {
// FIXME: send ERROR
std::cerr << "INVOCATION failed" << std::endl;
}

std::cerr << "==> 6" << std::endl;

} else {
throw ProtocolError("bogus INVOCATION message for non-registered registration ID");
}
Expand Down Expand Up @@ -918,18 +899,9 @@ namespace autobahn {

uint64_t registration_id = msg[2].as<uint64_t>();

//std::cerr << "REGxx" << register_request->second.m_endpoint << std::endl;

std::cerr << "XXX 2a " << (register_request->second).m_endpoint.type().name() << std::endl;

m_endpoints[registration_id] = register_request->second.m_endpoint;

std::cerr << "XXX 2b " << (m_endpoints[registration_id]).type().name() << std::endl;

registration reg;
reg.m_id = registration_id;

register_request->second.m_res.set_value(reg);
register_request->second.m_res.set_value(registration(registration_id));

} else {
throw ProtocolError("bogus REGISTERED message for non-pending request ID");
Expand Down
2 changes: 2 additions & 0 deletions test/test16.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ int main () {
cerr << "Subscribed with subscription ID " << sub.get().id << endl;
});

f1.wait();

});

} else {
Expand Down

0 comments on commit 0afc462

Please sign in to comment.