Skip to content

Commit

Permalink
sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Oberstein committed Mar 28, 2014
1 parent 10c65a6 commit bafd776
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ test_asio:

test_asio2:
scons
./build/test/test13
./build/test/test14

test_worker:
scons
Expand Down
5 changes: 4 additions & 1 deletion include/autobahn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ namespace autobahn {
* \param in The input stream to run this session on.
* \param out THe output stream to run this session on.
*/
session(IStream& in, OStream& out);
session(boost::asio::io_service& io, IStream& in, OStream& out);

/*!
* Join a realm with this session.
Expand Down Expand Up @@ -312,6 +312,8 @@ namespace autobahn {

bool m_stopped;

boost::asio::io_service& m_io;

/// Input stream this session runs on.
IStream& m_in;

Expand All @@ -322,6 +324,7 @@ namespace autobahn {
char m_buffer_msg_len[4];
uint32_t m_msg_len;

boost::promise<boost::any> m_test_promise;


/// MsgPack serialization buffer.
Expand Down
10 changes: 7 additions & 3 deletions include/autobahn.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@
namespace autobahn {

template<typename IStream, typename OStream>
session<IStream, OStream>::session(IStream& in, OStream& out)
session<IStream, OStream>::session(boost::asio::io_service& io, IStream& in, OStream& out)
: m_debug(true),
m_stopped(false),
m_io(io),
m_in(in),
m_out(out),
m_packer(&m_buffer),
Expand Down Expand Up @@ -119,7 +120,7 @@ namespace autobahn {

send();

return m_session_join.get_future();
return std::move(m_session_join.get_future());
}


Expand Down Expand Up @@ -294,7 +295,10 @@ namespace autobahn {
pack_any(args);
send();

return m_calls[m_request_id].m_res.get_future();
//return m_calls[m_request_id].m_res.get_future();
//return std::move(m_calls[m_request_id].m_res.get_future());
//return m_calls[m_request_id].m_res.get_future();
return m_test_promise.get_future();

} else {
return call(procedure);
Expand Down
3 changes: 2 additions & 1 deletion test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ else:
# tests.append(env.Program('test11.cpp', LIBS = ['boost_thread', 'boost_system', 'msgpack', autobahn]))
# tests.append(env.Program('test_asio.cpp', LIBS = ['boost_thread', 'boost_system', 'msgpack']))
# tests.append(env.Program('publish.cpp', LIBS = ['boost_thread', 'boost_system', 'msgpack', autobahn]))
tests.append(env.Program('test13.cpp', LIBS = ['boost_thread', 'boost_system', 'msgpack']))
# tests.append(env.Program('test13.cpp', LIBS = ['boost_thread', 'boost_system', 'msgpack']))
tests.append(env.Program('test14.cpp', LIBS = ['boost_thread', 'boost_system']))
# tests.append(env.Program('test12.cpp', LIBS = ['boost_thread', 'boost_system', 'msgpack']))
# tests.append(env.Program(['test7.cpp', '#/src/autobahn.cpp'], LIBS = ['boost_thread', 'boost_system', 'msgpack']))
# tests.append(env.Program(['test7.cpp'], LIBS = ['boost_thread', 'boost_system', 'msgpack']))
Expand Down
62 changes: 57 additions & 5 deletions test/test13.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,47 @@
//
///////////////////////////////////////////////////////////////////////////////

#if 1

// http://stackoverflow.com/questions/22597948/using-boostfuture-with-then-continuations/
#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <boost/thread/future.hpp>

struct Foo {

boost::future<int> start() {
return p.get_future();
}

void finish() {
p.set_value(23);
}

boost::promise<int> p;
};


int main () {
Foo foo;

auto f1 = foo.start();
auto f2 = f1.then([](boost::future<int> f) {
std::cout << "done:" << std::endl;
std::cout << f.get() << std::endl;
});

foo.finish();
//f2.get();
}

#else
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include "autobahn.hpp"
Expand All @@ -29,6 +67,14 @@ using namespace boost;
using boost::asio::ip::tcp;
template<typename R>
bool is_ready(future<R>& f)
{ return f.wait_for(std::chrono::seconds(0)) == future_status::ready; }
// http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/examples/cpp11_examples.html
int main () {
Expand All @@ -50,7 +96,7 @@ int main () {
tcp::socket socket(io);
autobahn::session<tcp::socket,
tcp::socket> session(socket, socket);
tcp::socket> session(io, socket, socket);
tcp::resolver resolver(io);
auto endpoint_iterator = resolver.resolve({"127.0.0.1", "8080"});
Expand All @@ -64,16 +110,17 @@ int main () {
cerr << "connected" << endl;
session.start();
auto s = session.join(string("realm1")).then([&session](future<int> s) {
auto s = session.join(string("realm1")).then([&](future<int> s) {
cerr << "session joined" << endl;
session.publish("com.myapp.topic1");
#if 1
auto cf = session.call("com.arguments.add2", {2, 3});

auto c = cf.then([&session](future<any> f) {
auto c = session.call("com.arguments.add2", {2, 3})
.then([&](future<any> f) {
cerr << "call returned" << endl;
//cerr << "future valid: " << f.valid() << endl;
//cerr << "future ready: " << is_ready(f) << endl;
any r = f.get();
Expand All @@ -84,6 +131,10 @@ int main () {
cerr << "result: " << res << endl;
});
cerr << "HERE2" << endl;
c.get();
//c.wait();
#endif
});
Expand Down Expand Up @@ -111,3 +162,4 @@ int main () {
}
return 0;
}
#endif
107 changes: 107 additions & 0 deletions test/test14.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
///////////////////////////////////////////////////////////////////////////////
//
// Copyright (C) 2014 Tavendo GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
///////////////////////////////////////////////////////////////////////////////

#include <map>
#include <memory>
#include <iostream>

#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <boost/thread/future.hpp>
#include <boost/asio.hpp>

using namespace std;
using namespace boost;


struct rpcsvc {

rpcsvc (asio::io_service& io) : m_io(io), m_call_id(0) {
}

struct Call {
std::shared_ptr<asio::deadline_timer> m_timer;
promise<float> m_promise;
};

future<float> slowsquare(float x, int delay) {

m_call_id += 1;
m_calls[m_call_id] = Call();

std::shared_ptr<asio::deadline_timer>
timer(new asio::deadline_timer(m_io, posix_time::seconds(delay)));

m_calls[m_call_id].m_timer = timer;

int call_id = m_call_id;

timer->async_wait(
[=](system::error_code ec) {
if (!ec) {
cout << m_call_id << " - " << call_id << endl;
this->m_calls[call_id].m_promise.set_value(x * x);
this->m_calls.erase(call_id);
} else {
cout << "Error in timer: " << ec << endl;
}
}
);

return m_calls[m_call_id].m_promise.get_future();
}

asio::io_service& m_io;
int m_call_id;
map<int, Call> m_calls;
};


int main () {

try {
asio::io_service io;

rpcsvc rpc(io);

auto f1 = rpc.slowsquare(2, 2).then([](future<float> f) {
cout << "call 1 returned" << endl;
cout << "result 1: " << f.get() << endl;
});

auto f2 = rpc.slowsquare(3, 1).then([](future<float> f) {
cout << "call 2 returned" << endl;
cout << "result 2: " << f.get() << endl;
});

auto f3 = when_all(std::move(f1), std::move(f2));
auto f4 = f3.then([](decltype(f3)) {
cout << "done." << endl;
});

io.run();

f4.get();
}
catch (std::exception& e) {
cerr << e.what() << endl;
return 1;
}
return 0;
}

0 comments on commit bafd776

Please sign in to comment.