Skip to content

Commit

Permalink
initial TwitterStream impl
Browse files Browse the repository at this point in the history
  • Loading branch information
QuietMisdreavus committed Dec 10, 2017
1 parent 8f94a42 commit 7bc8bbf
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 9 deletions.
12 changes: 8 additions & 4 deletions src/common/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,13 @@ impl<T> FromIterator<Response<T>> for Response<Vec<T>> {
}
}

pub fn get_response(handle: &Handle, request: Request) -> Result<FutureResponse, error::Error> {
// TODO: num-cpus?
let connector = try!(HttpsConnector::new(1, handle));
let client = hyper::Client::configure().connector(connector).build(handle);
Ok(client.request(request))
}

/// A `Future` that resolves a web request and loads the complete response into a String.
///
/// This also does some header inspection, and attempts to parse the response as a `TwitterErrors`
Expand Down Expand Up @@ -376,10 +383,7 @@ impl Future for RawFuture {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(req) = self.request.take() {
// needed to pull this section into the future so i could try!() on the connector
// TODO: num-cpus?
let connector = try!(HttpsConnector::new(1, &self.handle));
let client = hyper::Client::configure().connector(connector).build(&self.handle);
self.response = Some(client.request(req));
self.response = Some(try!(get_response(&self.handle, req)));
}

if let Some(mut resp) = self.response.take() {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub mod place;
pub mod direct;
pub mod service;
pub mod list;
pub mod stream;
mod links;

pub use auth::{KeyPair, Token, AuthFuture, request_token, authorize_url, authenticate_url,
Expand Down
4 changes: 4 additions & 0 deletions src/links.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,7 @@ pub mod service {
pub const CONFIG: &'static str = "https://api.twitter.com/1.1/help/configuration.json";
pub const RATE_LIMIT_STATUS: &'static str = "https://api.twitter.com/1.1/application/rate_limit_status.json";
}

pub mod stream {
pub const USER: &'static str = "https://userstream.twitter.com/1.1/user.json";
}
154 changes: 154 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//! Access to the Streaming API.
use std::{self, io};

use futures::{Future, Stream, Poll, Async};
use hyper::Body;
use hyper::client::{Request, FutureResponse};
use rustc_serialize::json;

use auth::{self, Token};
use error;
use links;
use tweet::Tweet;

use common::*;

/// Represents the kinds of messages that can be sent over Twitter's Streaming API.
#[derive(Debug)]
pub enum StreamMessage {
/// A blank line, sent periodically to keep the connection alive.
Ping,
/// A new tweet.
///
/// Note that the `entities` inside the `user` field will be empty for tweets received via the
/// Streaming API.
Tweet(Tweet),
/// An unhandled message payload.
///
/// Twitter can add new streaming messages to the API, and egg-mode includes them here so that
/// they can be used before egg-mode has a chance to handle them.
Unknown(json::Json),
}

impl FromJson for StreamMessage {
fn from_json(input: &json::Json) -> Result<Self, error::Error> {
if let Some(_event) = input.find("event") {
} else {
if let Ok(tweet) = Tweet::from_json(input) {
return Ok(StreamMessage::Tweet(tweet));
}
}
Ok(StreamMessage::Unknown(input.clone()))
}

fn from_str(input: &str) -> Result<Self, error::Error> {
if input.trim().is_empty() {
Ok(StreamMessage::Ping)
} else {
let json = try!(json::Json::from_str(input.trim()));

StreamMessage::from_json(&json)
}
}
}

/// A `Stream` that represents a connection to the Twitter Streaming API.
pub struct TwitterStream {
buf: Vec<u8>,
handle: Handle,
request: Option<Request>,
response: Option<FutureResponse>,
body: Option<Body>,
}

impl TwitterStream {
fn new(handle: &Handle, request: Request) -> TwitterStream {
TwitterStream {
buf: vec![],
handle: handle.clone(),
request: Some(request),
response: None,
body: None,
}
}
}

impl Stream for TwitterStream {
type Item = StreamMessage;
type Error = error::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(req) = self.request.take() {
self.response = Some(try!(get_response(&self.handle, req)));
}

if let Some(mut resp) = self.response.take() {
match resp.poll() {
Err(e) => return Err(e.into()),
Ok(Async::NotReady) => {
self.response = Some(resp);
return Ok(Async::NotReady);
},
Ok(Async::Ready(resp)) => {
let status = resp.status();
if !status.is_success() {
//TODO: should i try to pull the response regardless?
return Err(error::Error::BadStatus(status));
}

self.body = Some(resp.body());
},
}
}

if let Some(mut body) = self.body.take() {
loop {
match body.poll() {
Err(e) => {
self.body = Some(body);
return Err(e.into());
},
Ok(Async::NotReady) => {
self.body = Some(body);
return Ok(Async::NotReady);
},
Ok(Async::Ready(None)) => {
//TODO: introduce a new error for this?
return Err(error::Error::FutureAlreadyCompleted);
},
Ok(Async::Ready(Some(chunk))) => {
self.buf.extend(&*chunk);

if let Some(pos) = self.buf.windows(2).position(|w| w == b"\r\n") {
self.body = Some(body);
let pos = pos + 2;
let resp = if let Ok(msg_str) = std::str::from_utf8(&self.buf[..pos]) {
StreamMessage::from_str(msg_str)
} else {
Err(io::Error::new(io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8").into())
};

self.buf.drain(..pos);
return Ok(Async::Ready(Some(try!(resp))));
}
},
}
}
} else {
Err(error::Error::FutureAlreadyCompleted)
}
}
}

/// Opens a `TwitterStream` to the authenticated user's home stream.
pub fn user(handle: &Handle, token: &Token) -> TwitterStream {
let req = auth::get(links::stream::USER, token, None);

TwitterStream::new(handle, req)
}
3 changes: 2 additions & 1 deletion src/tweet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ impl FromJson for Tweet {
};

field_present!(input, created_at);
field_present!(input, entities);
field_present!(input, id);
field_present!(input, lang);
field_present!(input, retweet_count);
Expand All @@ -269,6 +268,8 @@ impl FromJson for Tweet {
entities = try!(field(ext, "entities"));
extended_entities = try!(field(ext, "extended_entities"));
} else {
field_present!(input, entities);

text = try!(field(input, "full_text").or(field(input, "text")));
display_text_range = try!(field(input, "display_text_range"));
entities = try!(field(input, "entities"));
Expand Down
8 changes: 4 additions & 4 deletions src/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pub struct TwitterUser {
}

/// Container for URL entity information that may be paired with a user's profile.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct UserEntities {
/// URL information that has been parsed out of the user's `description`. If no URLs were
/// detected, then the contained Vec will be empty.
Expand All @@ -342,7 +342,7 @@ pub struct UserEntities {
}

/// Represents a collection of URL entity information paired with a specific user profile field.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct UserEntityDetail {
/// Collection of URL entity information.
///
Expand All @@ -361,7 +361,6 @@ impl FromJson for TwitterUser {
field_present!(input, created_at);
field_present!(input, default_profile);
field_present!(input, default_profile_image);
field_present!(input, entities);
field_present!(input, favourites_count);
field_present!(input, followers_count);
field_present!(input, friends_count);
Expand All @@ -386,7 +385,8 @@ impl FromJson for TwitterUser {

let description: Option<String> = try!(field(input, "description"));
let url: Option<String> = try!(field(input, "url"));
let mut entities: UserEntities = try!(field(input, "entities"));
let entities: Option<UserEntities> = try!(field(input, "entities"));
let mut entities = entities.unwrap_or_default();

if let Some(ref text) = description {
for entity in entities.description.urls.iter_mut() {
Expand Down

0 comments on commit 7bc8bbf

Please sign in to comment.