Skip to content

Commit

Permalink
Add futures via tangle
Browse files Browse the repository at this point in the history
  • Loading branch information
thehydroimpulse committed Mar 13, 2016
1 parent 4ed2f8e commit e10a77c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 66 deletions.
8 changes: 8 additions & 0 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ impl Deserialize for String {
}
}

impl Serialize for () {
fn serialize<S>(&self, s: &mut S) -> Result<(), Error>
where S: Serializer + ThriftSerializer
{
Ok(())
}
}

impl Serialize for bool {
fn serialize<S>(&self, s: &mut S) -> Result<(), Error>
where S: Serializer + ThriftSerializer
Expand Down
96 changes: 30 additions & 66 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,35 @@ use protocol::{Serializer, ThriftSerializer, ThriftMessage, ThriftDeserializer,
Deserialize, Serialize, ThriftType, ThriftMessageType, Error};
use binary_protocol::{BinarySerializer, BinaryDeserializer};
use std::io::Cursor;
use tangle::{Future, Async};

pub trait Service {
fn query(&mut self, val: bool);
}

/// TransformCall trait is the transformation from a thrift object to
/// a Rust method call.
pub struct TransformIncomingCall<'a, D: 'a> {
de: &'a mut D,
service: &'a mut Service
}

pub trait TransformCall {
fn call_query(&mut self) -> Result<(), Error>;
}

/// XXX: Add support for RPC return/reply.
/// XXX: Add Future support for RPC calls. This will allow us to support return types as we can
/// essentially have:
///
/// ```notrust
/// TransformIncomingCall::new(&mut de).call_query()
/// ```
impl<'a, D> TransformIncomingCall<'a, D>
where D: 'a + Deserializer + ThriftDeserializer
{
pub fn new(de: &'a mut D, service: &'a mut Service) -> TransformIncomingCall<'a, D> {
TransformIncomingCall {
de: de,
service: service
}
}
}

impl<'a, D> TransformCall for TransformIncomingCall<'a, D>
where D: 'a + Deserializer + ThriftDeserializer
{
fn call_query(&mut self) -> Result<(), Error> {
// Deserialize into QueryArgs
let args: QueryArgs = try!(Deserialize::deserialize(self.de));

self.service.query(args.val);
Ok(())
}
fn query(&mut self, val: bool) -> Future<()>;
}

// Generated
fn transform_msg<D>(msg: ThriftMessage, de: &mut D, service: &mut Service) -> Result<(), Error>
fn dispatch_method_call<D>(msg: ThriftMessage, de: &mut D, service: &mut Service) -> Result<Future<Vec<u8>>, Error>
where D: Deserializer + ThriftDeserializer
{
match &*msg.name {
"query" => {
let args: QueryArgs = try!(Deserialize::deserialize(de));
service.query(args.val);
Ok(service.query(args.val).map(|val| {
// XXX Write the whole message reply here.
let mut v = Vec::new();
{
let mut s = BinarySerializer::new(&mut v);
val.serialize(&mut s);
}

v
}))
},
_ => {
unimplemented!()
// Return Err.
}
}

Ok(())
}

pub struct QueryArgs {
Expand Down Expand Up @@ -97,15 +65,6 @@ impl Serialize for QueryArgs {
}
}


pub struct RpcServer;

impl Service for RpcServer {
fn query(&mut self, val: bool) {

}
}

fn dispatch_query(service: &mut Service, args: QueryArgs) {
service.query(args.val);
}
Expand All @@ -123,7 +82,7 @@ impl RpcClient {
}

impl Service for RpcClient {
fn query(&mut self, val: bool) {
fn query(&mut self, val: bool) -> Future<()> {
{
let mut proto = BinarySerializer::new(&mut self.buf);
let args = QueryArgs {
Expand All @@ -134,6 +93,8 @@ impl Service for RpcClient {
args.serialize(&mut proto);
proto.write_message_end();
}

Future::unit(())
}
}

Expand All @@ -153,25 +114,24 @@ impl<'a, D> MessagePipeline<'a, D>
}

/// Dispatch the incoming RPC call to the respective service method.
pub fn dispatch(&mut self, msg: ThriftMessage) -> Result<(), Error> {
try!(transform_msg(msg, &mut self.de, self.service));
Ok(())
pub fn dispatch(&mut self, msg: ThriftMessage) -> Result<Future<Vec<u8>>, Error> {
dispatch_method_call(msg, &mut self.de, self.service)
}

/// XXX: The fn signature should be `Result<Future<Vec<u8>>, Error>` where the serialized
/// response is returned into the future.
pub fn run(&mut self) -> Result<(), Error> {
pub fn run(&mut self) -> Result<Future<Vec<u8>>, Error> {
let msg = try!(self.de.read_message_begin());

match msg.ty {
// Dispatch on an RPC method call.
ThriftMessageType::Call => {
try!(self.dispatch(msg));
Ok(try!(self.dispatch(msg)))
},
_ => {}
_ => {
panic!("unexpected");
}
}

Ok(())
}
}

Expand All @@ -186,8 +146,9 @@ fn call_query() {
struct Server;

impl Service for Server {
fn query(&mut self, val: bool) {
fn query(&mut self, val: bool) -> Future<()> {
assert_eq!(val, true);
Future::unit(())
}
}

Expand All @@ -203,5 +164,8 @@ fn call_query() {
// ```
//
// Where `res` is the serialized response.
pipe.run().unwrap();
pipe.run().unwrap().and_then(|v| {
println!("{:?}", v);
Async::Ok(())
});
}

0 comments on commit e10a77c

Please sign in to comment.