Skip to content

markschl/thread_io

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

thread_io

This crate allows to easily wrap readers and writers in a background thread. This can be useful e.g. with readers and writers for compression formats to reduce load on the main thread.

Documentation

Examples

Reading

The following code counts the number of lines containing a search term in a gzip compressed file. Decompression is done in a background thread using the flate2 library, and the decompressed data is sent to a reader supplied to a closure in the main thread. The speed gain should be highest if decompression and text searching use about the same amount of CPU time.

use std::io::prelude::*;
use std::io;
use std::fs::File;
use thread_io::read::reader;
use flate2::read::GzDecoder;

// size of buffers sent across threads
let BUF_SIZE = 256 * 1024;
// length of queue with buffers pre-filled in background thread
let QUEUE_LEN = 5;

let f = File::open("file.txt.gz").unwrap();
let gz = GzDecoder::new(f);
let search_term = "spam";

let found = reader(BUF_SIZE, QUEUE_LEN,  gz, |reader| {
    let mut buf_reader = io::BufReader::new(reader);
    let mut found = 0;
    let mut line = String::new();
    while buf_reader.read_line(&mut line)? > 0 {
        if line.contains(search_term) {
            found += 1;
        }
        line.clear();
    }
    Ok::<_, io::Error>(found)
})
.expect("decoding error");

println!("Found '{}' in {} lines.", search_term, found);

Note that the closure in reader() allows returning any error type implementing From<io::Error>, not just io::Error itself. On the downside, the compiler sometimes needs a hint about the error type, as provided with Ok::<_, io::Error>().

The thread_io::read module also provides a function for working with non-Send reader types, see documentation here.

Writing

Writing to a gzip compressed file in a background thread works similarly. The following code writes all lines containing a specific term to a compressed file:

use std::fs::File;
use std::io::prelude::*;
use std::io;
use thread_io::write::writer_finish;
use flate2::write::{GzEncoder};
use flate2::Compression;

const BUF_SIZE = 256 * 1024;
const QUEUE_LEN = 5;

let infile = File::open("file.txt").unwrap();
let outfile = File::create("file.txt.gz").unwrap();
let gz_out = GzEncoder::new(outfile, Compression::default());
let search_term = "spam";

writer_finish(BUF_SIZE, QUEUE_LEN, gz_out,
    // runs in main thread
    |writer| {
        let mut buf_infile = io::BufReader::new(infile);
        let mut line = String::new();
        while buf_infile.read_line(&mut line)? > 0 {
            if line.contains(search_term) {
                writer.write(line.as_bytes()).expect("write error");
                line.clear();
            }
        }
        Ok::<_, io::Error>(())
    },
    // runs when main closure finished
    |writer| writer.finish()
)
.expect("encoding error");

In this example, writer.finish() is called in a separate 'finalizing' closure, which supplies the GzEncoder object by value after all writing is done. Besides, there is also a standard writer method, which takes only one closure. However, note that even calling flush() on a writer should be done in a finalizing closure because the last write is often performed after the main closure ends.

Module documentation

Crossbeam channels

It is possible to use the channel implementation from the crossbeam crate by specifying the crossbeam_channel feature. However, on my system, I haven't found any performance gain over using the channels from the standard library.

Similar projects

fastq-rs provides a very similar functionality as thread_io::read::reader in its thread_reader module.

About

Rust crate for performing I/O in background thread

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages