Skip to content

Commit

Permalink
Initial placement foundation (toshi-search#91)
Browse files Browse the repository at this point in the history
* Update to new tower-consul

* Update to latest version of tower-consul

* Add a builder for the consul interace

* Rename Consul::build to Consul::builder

* WIP

* Update deps and fix consul request body type

* Cargo update

* Simplify placement background task

* Address comments

* Refactor settings and add background consul service refresh

* Fix additional unwrap_or
  • Loading branch information
LucioFranco authored and hntd187 committed Jan 19, 2019
1 parent 866b9f7 commit b9c0e8a
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 127 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
logs/
new_index/
.node_id.txt
data/*
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ build = "build.rs"
[[bin]]
name = "toshi"

[[bin]]
name = "toshi-placement"

[lib]
path = "src/lib.rs"

Expand All @@ -23,6 +20,7 @@ tower-grpc-build = { git = "https://github.com/tower-rs/tower-grpc" }
[dependencies]
tower-service = { git = "https://github.com/tower-rs/tower" }
tower-direct-service = { git = "https://github.com/tower-rs/tower" }
tower-discover = { git = "https://github.com/tower-rs/tower" }
tower-buffer = { git = "https://github.com/tower-rs/tower" }
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
Expand All @@ -35,6 +33,7 @@ http = "^0.1"
h2 = "0.1.14"
taken = "0.1.1"
flate2 = "^1.0"
futures-watch = { git = "https://github.com/carllerche/better-future" }
chashmap = "^2.2"
bytes = "^0.4"
prost = "^0.4"
Expand Down Expand Up @@ -78,4 +77,4 @@ codegen-units = 1
#tower-util = { git = "https://github.com/tower-rs/tower" }
#tower-http = { git = "https://github.com/tower-rs/tower-http" }
#tower-consul = { git = "https://github.com/LucioFranco/tower-consul" }
#tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
#tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
4 changes: 2 additions & 2 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ log_level = "info"
json_parsing_threads = 4
bulk_buffer_size = 10000
auto_commit_duration = 10
enable_clustering = false
master = false
enable_clustering = true
master = true
nodes = [
"127.0.0.1:8081",
"127.0.0.1:8082"
Expand Down
7 changes: 7 additions & 0 deletions config/consul.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"service": {
"name": "toshi",
"tags": [],
"port": 8081
}
}
74 changes: 0 additions & 74 deletions src/bin/toshi-placement.rs

This file was deleted.

37 changes: 23 additions & 14 deletions src/bin/toshi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,11 @@ fn settings() -> Settings {
.default_value("8080"),
)
.arg(
Arg::with_name("consul-host")
Arg::with_name("consul-addr")
.short("C")
.long("consul-host")
.long("consul-addr")
.takes_value(true)
.default_value("localhost"),
)
.arg(
Arg::with_name("consul-port")
.short("P")
.long("consul-port")
.takes_value(true)
.default_value("8500"),
.default_value("127.0.0.1:8500"),
)
.arg(
Arg::with_name("cluster-name")
Expand Down Expand Up @@ -170,9 +163,25 @@ fn run(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<I

if settings.enable_clustering {
let settings = settings.clone();
let run = future::lazy(move || connect_to_consul(&settings))
.and_then(move |_| commit_watcher)
.and_then(move |_| router_with_catalog(&bind, &catalog));
let place_addr = settings.place_addr.clone();
let consul_addr = settings.consul_addr.clone();
let cluster_name = settings.cluster_name.clone();

let run = future::lazy(move || connect_to_consul(&settings)).and_then(move |_| {
tokio::spawn(commit_watcher);

let mut consul = Consul::builder()
.with_cluster_name(cluster_name)
.with_address(consul_addr)
.build()
.expect("Could not build Consul client.");

let place_addr = place_addr.parse().expect("Placement address must be a valid SocketAddr");
tokio::spawn(cluster::run(place_addr, consul).map_err(|e| error!("Error with running cluster: {}", e)));

router_with_catalog(&bind, &catalog)
});

future::Either::A(run)
} else {
let run = commit_watcher.and_then(move |_| router_with_catalog(&bind, &catalog));
Expand All @@ -181,7 +190,7 @@ fn run(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<I
}

fn connect_to_consul(settings: &Settings) -> impl Future<Item = (), Error = ()> {
let consul_address = format!("{}:{}", &settings.consul_host, settings.consul_port);
let consul_address = settings.consul_addr.clone();
let cluster_name = settings.cluster_name.clone();
let settings_path_read = settings.path.clone();
let settings_path_write = settings.path.clone();
Expand Down
25 changes: 18 additions & 7 deletions src/cluster/consul.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
//! Provides an interface to a Consul cluster
use bytes::Bytes;
use futures::{stream::Stream, Async, Future, Poll};
use hyper::body::Body;
use hyper::client::HttpConnector;
use hyper::http::uri::Scheme;
use hyper::{Client, Request, Response, Uri};
use hyper_tls::HttpsConnector;
use serde::{Deserialize, Serialize};
use tower_consul::{Consul as TowerConsul, KVValue};
use tower_consul::{Consul as TowerConsul, ConsulService, KVValue};
use tower_service::Service;

use crate::cluster::shard::PrimaryShard;
use crate::cluster::shard::ReplicaShard;
use crate::cluster::shard::Shard;
use crate::cluster::ClusterError;
use crate::{Error, Result};
use bytes::Bytes;

#[derive(Serialize, Deserialize)]
pub struct NodeData {
Expand Down Expand Up @@ -68,7 +70,10 @@ impl Consul {
}

/// Registers a shard with the Consul cluster
pub fn register_shard<T: Shard + Serialize>(&mut self, shard: &T) -> impl Future<Item = (), Error = ClusterError> {
pub fn register_shard<T>(&mut self, shard: &T) -> impl Future<Item = (), Error = ClusterError>
where
T: Shard + Serialize,
{
let key = format!("toshi/{}/{}", self.cluster_name(), shard.shard_id().to_hyphenated_ref());
let shard = serde_json::to_vec(&shard).unwrap();

Expand All @@ -78,6 +83,12 @@ impl Consul {
.map_err(|err| ClusterError::FailedCreatingPrimaryShard(format!("{:?}", err)))
}

pub fn nodes(&mut self) -> impl Future<Item = Vec<ConsulService>, Error = ClusterError> {
self.client
.service_nodes("toshi")
.map_err(|err| ClusterError::FailedFetchingNodes(format!("{:?}", err)))
}

/// Gets the specified index
pub fn get_index(&mut self, index: String, recurse: bool) -> impl Future<Item = Vec<KVValue>, Error = ClusterError> {
let key = format!("toshi/{}/{}?recurse={}", &self.cluster_name(), &index, recurse);
Expand Down Expand Up @@ -137,10 +148,10 @@ impl Builder {
}

pub fn build(self) -> Result<Consul> {
let address = self.address.unwrap_or_else(|| "127.0.0.1:8500".into());
let address = self.address.unwrap_or_else(|| "127.0.0.1:8500".parse().unwrap());
let scheme = self.scheme.unwrap_or(Scheme::HTTP);

let client = TowerConsul::new(HttpsService::default(), 100, scheme.to_string(), address.clone()).map_err(|_| Error::SpawnError)?;
let client = TowerConsul::new(HttpsService::new(), 100, scheme.to_string(), address.to_string()).map_err(|_| Error::SpawnError)?;

Ok(Consul {
address,
Expand All @@ -157,8 +168,8 @@ pub struct HttpsService {
client: Client<HttpsConnector<HttpConnector>>,
}

impl Default for HttpsService {
fn default() -> Self {
impl HttpsService {
fn new() -> Self {
let https = HttpsConnector::new(4).expect("Could not create TLS for Hyper");
let client = Client::builder().build::<_, hyper::Body>(https);

Expand Down
Loading

0 comments on commit b9c0e8a

Please sign in to comment.