diff --git a/Cargo.toml b/Cargo.toml index 32b0248..0d922a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "frag_gene_scan_rs" -version = "0.3.2" +version = "0.4.0" authors = ["Felix Van der Jeugt "] edition = "2018" diff --git a/README.md b/README.md index d382ec6..281d368 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,10 @@ where: * `-r train_file_dir` can change the directory containing the training files, so you can put it anywhere on your system. +* `-u` can be used for some additional speed when using multithreading. The + output will no longer be in the same order as the input (as in FGS and + FGS+). + The complete list of options will be printed when running `FragGeneScanRs --help`. diff --git a/src/bin/FragGeneScanRs.rs b/src/bin/FragGeneScanRs.rs index b513443..fa90be5 100644 --- a/src/bin/FragGeneScanRs.rs +++ b/src/bin/FragGeneScanRs.rs @@ -1,6 +1,7 @@ //! FragGeneScanRs executable #![allow(non_snake_case)] +use std::collections::VecDeque; use std::fs::File; use std::io; use std::io::{Read, Write}; @@ -101,6 +102,10 @@ fn main() -> Result<()> { .value_name("nucleotide_file") .takes_value(true) .help("Output predicted genes to this file (supersedes -o).")) + .arg(Arg::with_name("unordered") + .short("u") + .long("unordered") + .help("Do not preserve record order in output (faster).")) .get_matches(); let (global, locals) = hmm::get_train_from_file( @@ -123,23 +128,23 @@ fn main() -> Result<()> { (None, None) => None, }; - let metastream: Option = match ( + let metastream: Option> = match ( matches.value_of("meta-file"), matches.value_of("output-prefix"), ) { - (Some(filename), _) => Some(File::create(filename)?), + (Some(filename), _) => Some(Box::new(File::create(filename)?)), (None, Some("stdout")) => None, - (None, Some(filename)) => Some(File::create(filename.to_owned() + ".out")?), + (None, Some(filename)) => Some(Box::new(File::create(filename.to_owned() + ".out")?)), (None, None) => None, }; - let dnastream: Option = match ( + let dnastream: Option> = match ( matches.value_of("nucleotide-file"), matches.value_of("output-prefix"), ) { - (Some(filename), _) => Some(File::create(filename)?), + (Some(filename), _) => Some(Box::new(File::create(filename)?)), (None, Some("stdout")) => None, - (None, Some(filename)) => Some(File::create(filename.to_owned() + ".ffn")?), + (None, Some(filename)) => Some(Box::new(File::create(filename.to_owned() + ".ffn")?)), (None, None) => None, }; @@ -147,28 +152,42 @@ fn main() -> Result<()> { aastream = Some(Box::new(io::stdout())); } - run( - global, - locals, - inputseqs, - aastream, - metastream, - dnastream, - matches.value_of("complete").unwrap() == "1", - matches.is_present("formatted"), - usize::from_str_radix(matches.value_of("thread-num").unwrap(), 10)?, - )?; + if matches.is_present("unordered") { + run( + global, + locals, + inputseqs, + aastream.map(UnbufferingBuffer::new), + metastream.map(UnbufferingBuffer::new), + dnastream.map(UnbufferingBuffer::new), + matches.value_of("complete").unwrap() == "1", + matches.is_present("formatted"), + usize::from_str_radix(matches.value_of("thread-num").unwrap(), 10)?, + )?; + } else { + run( + global, + locals, + inputseqs, + aastream.map(SortingBuffer::new), + metastream.map(SortingBuffer::new), + dnastream.map(SortingBuffer::new), + matches.value_of("complete").unwrap() == "1", + matches.is_present("formatted"), + usize::from_str_radix(matches.value_of("thread-num").unwrap(), 10)?, + )?; + } Ok(()) } -fn run( +fn run( global: Box, locals: Vec, inputseqs: R, - aastream: Option, - metastream: Option, - dnastream: Option, + aa_buffer: Option, + meta_buffer: Option, + dna_buffer: Option, whole_genome: bool, formatted: bool, thread_num: usize, @@ -177,12 +196,14 @@ fn run( .num_threads(thread_num) .build_global()?; - let aastream = aastream.map(Mutex::new); - let metastream = metastream.map(Mutex::new); - let dnastream = dnastream.map(Mutex::new); + let meta_buffer = meta_buffer.map(Mutex::new); + let dna_buffer = dna_buffer.map(Mutex::new); + let aa_buffer = aa_buffer.map(Mutex::new); + Chunked::new(100, fasta::Reader::new(inputseqs).into_records()) + .enumerate() .par_bridge() - .map(|recordvec| { + .map(|(index, recordvec)| { let mut metabuf = Vec::new(); let mut dnabuf = Vec::new(); let mut aabuf = Vec::new(); @@ -197,24 +218,24 @@ fn run( nseq, whole_genome, ); - if metastream.is_some() { + if meta_buffer.is_some() { read_prediction.meta(&mut metabuf)?; } - if dnastream.is_some() { + if dna_buffer.is_some() { read_prediction.dna(&mut dnabuf, formatted)?; } - if aastream.is_some() { + if aa_buffer.is_some() { read_prediction.protein(&mut aabuf, whole_genome)?; } } - if let Some(metastream) = &metastream { - metastream.lock().unwrap().write_all(&metabuf)?; + if let Some(buffer) = &meta_buffer { + buffer.lock().unwrap().add(index, metabuf)?; } - if let Some(dnastream) = &dnastream { - dnastream.lock().unwrap().write_all(&dnabuf)?; + if let Some(buffer) = &dna_buffer { + buffer.lock().unwrap().add(index, dnabuf)?; } - if let Some(aastream) = &aastream { - aastream.lock().unwrap().write_all(&aabuf)?; + if let Some(buffer) = &aa_buffer { + buffer.lock().unwrap().add(index, aabuf)?; } Ok(()) }) @@ -251,3 +272,56 @@ impl Iterator for Chunked { } } } + +trait WritingBuffer { + fn add(&mut self, index: usize, item: Vec) -> Result<()>; +} + +struct SortingBuffer { + next: usize, + queue: VecDeque>>, + stream: W, +} + +impl SortingBuffer { + fn new(stream: W) -> Self { + SortingBuffer { + next: 0, + queue: VecDeque::new(), + stream: stream, + } + } +} + +impl WritingBuffer for SortingBuffer { + fn add(&mut self, index: usize, item: Vec) -> Result<()> { + while self.next + self.queue.len() <= index { + self.queue.push_back(None); + } + self.queue[index - self.next] = Some(item); + + while self.queue.front().map(Option::is_some).unwrap_or(false) { + let item = self.queue.pop_front().unwrap().unwrap(); + self.next += 1; + self.stream.write_all(&item)?; + } + Ok(()) + } +} + +struct UnbufferingBuffer { + stream: W, +} + +impl UnbufferingBuffer { + fn new(stream: W) -> Self { + UnbufferingBuffer { stream } + } +} + +impl WritingBuffer for UnbufferingBuffer { + fn add(&mut self, _: usize, item: Vec) -> Result<()> { + self.stream.write_all(&item)?; + Ok(()) + } +} diff --git a/src/gene.rs b/src/gene.rs index 972c9bb..db76181 100644 --- a/src/gene.rs +++ b/src/gene.rs @@ -17,6 +17,26 @@ impl ReadPrediction { } } + pub fn append_to( + &self, + aabuf: &mut Option>, + metabuf: &mut Option>, + dnabuf: &mut Option>, + formatted: bool, + whole_genome: bool, + ) -> Result<(), GeneError> { + if let Some(metabuf) = metabuf { + self.meta(&mut *metabuf)?; + } + if let Some(dnabuf) = dnabuf { + self.dna(&mut *dnabuf, formatted)?; + } + if let Some(aabuf) = aabuf { + self.protein(&mut *aabuf, whole_genome)?; + } + Ok(()) + } + pub fn meta(&self, buf: &mut Vec) -> Result<(), GeneError> { if !self.genes.is_empty() { buf.append(&mut format!(">{}\n", std::str::from_utf8(&self.head)?).into_bytes())