Skip to content
This repository has been archived by the owner on Sep 14, 2019. It is now read-only.

Commit

Permalink
Merge pull request #8 from crisidev/master
Browse files Browse the repository at this point in the history
#4 Merge PR awslabs#28 from upstream
crisidev authored Jul 16, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 864574c + 1ecef74 commit 232e31d
Showing 5 changed files with 100 additions and 13 deletions.
27 changes: 15 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -11,25 +11,28 @@ rust: nightly
env:
global:
- CRATE_NAME=flowgger
- DISABLE_TESTS=1
- TARGET=x86_64-unknown-linux-gnu

matrix:
include:
- env: TARGET=i686-unknown-linux-gnu
- env: TARGET=x86_64-unknown-linux-gnu
- env: TARGET=x86_64-unknown-linux-musl
- env: TARGET=i686-unknown-linux-gnu DISABLE_TESTS=1
- env: TARGET=i686-unknown-freebsd DISABLE_TESTS=1
- env: TARGET=x86_64-unknown-freebsd DISABLE_TESTS=1
- env: TARGET=aarch64-unknown-linux-gnu
- env: TARGET=armv7-unknown-linux-gnueabihf
- env: TARGET=mips-unknown-linux-gnu
- env: TARGET=mips64-unknown-linux-gnuabi64
- env: TARGET=mips64el-unknown-linux-gnuabi64
- env: TARGET=mipsel-unknown-linux-gnu
- env: TARGET=powerpc-unknown-linux-gnu
- env: TARGET=powerpc64-unknown-linux-gnu
- env: TARGET=powerpc64le-unknown-linux-gnu
- env: TARGET=aarch64-unknown-linux-gnu DISABLE_TESTS=1
- env: TARGET=armv7-unknown-linux-gnueabihf DISABLE_TESTS=1
- env: TARGET=mips-unknown-linux-gnu DISABLE_TESTS=1
- env: TARGET=mips64-unknown-linux-gnuabi64 DISABLE_TESTS=1
- env: TARGET=mips64el-unknown-linux-gnuabi64 DISABLE_TESTS=1
- env: TARGET=mipsel-unknown-linux-gnu DISABLE_TESTS=1
- env: TARGET=powerpc-unknown-linux-gnu DISABLE_TESTS=1
- env: TARGET=powerpc64-unknown-linux-gnu DISABLE_TESTS=1
- env: TARGET=powerpc64le-unknown-linux-gnu DISABLE_TESTS=1
- env: TARGET=s390x-unknown-linux-gnu DISABLE_TESTS=1
- env: TARGET=x86_64-unknown-linux-gnu
rust: nightly
allow_failures:
- env: TARGET=aarch64-unknown-linux-musl DISABLE_TESTS=1
rust: nightly

before_install: set -e
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -8,13 +8,14 @@ build = "build.rs"
[features]
capnp-recompile = ["capnpc", "capnp"]
coroutines = ["coio", "tls"]
default = ["syslog", "kafka-output", "redis", "capnp-recompile", "tls", "gelf", "ltsv"]
default = ["syslog", "kafka-output", "file", "redis", "capnp-recompile", "tls", "gelf", "ltsv"]
redis-input = ["redis"]
kafka-output = ["kafka"]
tls = ["openssl"]
gelf = ["serde", "serde_json"]
ltsv = []
syslog = []
file = []

[build-dependencies.capnpc]
version = "*"
13 changes: 13 additions & 0 deletions src/flowgger/mod.rs
Original file line number Diff line number Diff line change
@@ -36,6 +36,8 @@ use self::input::{TcpCoInput, TlsCoInput};
#[cfg(feature = "syslog")]
use self::input::{TcpInput, UdpInput};
use self::merger::{LineMerger, Merger, NulMerger, SyslenMerger};
#[cfg(feature = "file")]
use self::output::FileOutput;
#[cfg(feature = "kafka-output")]
use self::output::KafkaOutput;
#[cfg(feature = "tls")]
@@ -138,6 +140,16 @@ fn get_output_kafka(_config: &Config) -> ! {
panic!("Support for Kafka hasn't been compiled in")
}

#[cfg(feature = "file")]
fn get_output_file(config: &Config) -> Box<dyn Output> {
Box::new(FileOutput::new(config)) as Box<dyn Output>
}

#[cfg(not(feature = "file"))]
fn get_output_file(_config: &Config) -> ! {
panic!("Support for file hasn't been compiled in")
}

#[cfg(feature = "tls")]
fn get_output_tls(config: &Config) -> Box<dyn Output> {
Box::new(TlsOutput::new(config)) as Box<dyn Output>
@@ -153,6 +165,7 @@ fn get_output(output_type: &str, config: &Config) -> Box<dyn Output> {
"stdout" | "debug" => Box::new(DebugOutput::new(config)) as Box<dyn Output>,
"kafka" => get_output_kafka(config),
"tls" | "syslog-tls" => get_output_tls(config),
"file" => get_output_file(config),
_ => panic!("Invalid output type: {}", output_type),
}
}
66 changes: 66 additions & 0 deletions src/flowgger/output/file_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use super::Output;
use crate::flowgger::config::Config;
use crate::flowgger::merger::Merger;
use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::thread;

const FILE_DEFAULT_BUFFER_SIZE: usize = 1024;

pub struct FileOutput {
path: String,
buffer_size: usize,
}

impl FileOutput {
pub fn new(config: &Config) -> FileOutput {
let path = config
.lookup("output.file_path")
.expect("output.file_path is missing")
.as_str()
.expect("output.file_path must be a string")
.to_string();
let buffer_size =
config
.lookup("output.file_buffer_size")
.map_or(FILE_DEFAULT_BUFFER_SIZE, |bs| {
bs.as_integer()
.expect("output.file_buffer_size should be an integer")
as usize
});
FileOutput { path, buffer_size }
}
}

// TODO: Add periodic rotation handling
impl Output for FileOutput {
fn start(&self, arx: Arc<Mutex<Receiver<Vec<u8>>>>, merger: Option<Box<dyn Merger>>) {
let merger = match merger {
Some(merger) => Some(merger.clone_boxed()),
None => None,
};

let fd = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.unwrap_or_else(|_| panic!("Cannot open file descriptor to {}", &self.path));
let mut writer = BufWriter::with_capacity(self.buffer_size, fd);

thread::spawn(move || loop {
let mut bytes = match { arx.lock().unwrap().recv() } {
Ok(line) => line,
Err(_) => return,
};
if let Some(ref merger) = merger {
merger.frame(&mut bytes);
}

writer
.write_all(&bytes)
.expect("Cannot write bytes to output file");
});
}
}
4 changes: 4 additions & 0 deletions src/flowgger/output/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
mod debug_output;
#[cfg(feature = "file")]
mod file_output;
#[cfg(feature = "kafka-output")]
mod kafka_output;
#[cfg(feature = "tls")]
mod tls_output;

pub use self::debug_output::DebugOutput;
#[cfg(feature = "file")]
pub use self::file_output::FileOutput;
#[cfg(feature = "kafka-output")]
pub use self::kafka_output::KafkaOutput;
#[cfg(feature = "tls")]

0 comments on commit 232e31d

Please sign in to comment.