From e07e626d5538958068595fa8999496bbbcd26f61 Mon Sep 17 00:00:00 2001 From: Felix Van der Jeugt Date: Mon, 2 Aug 2021 18:07:15 +0200 Subject: [PATCH 1/3] implement OutputBuffer for in-order output --- src/bin/FragGeneScanRs.rs | 61 +++++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/src/bin/FragGeneScanRs.rs b/src/bin/FragGeneScanRs.rs index b513443..81cf189 100644 --- a/src/bin/FragGeneScanRs.rs +++ b/src/bin/FragGeneScanRs.rs @@ -1,9 +1,11 @@ //! FragGeneScanRs executable #![allow(non_snake_case)] +use std::collections::VecDeque; use std::fs::File; use std::io; use std::io::{Read, Write}; +use std::ops::DerefMut; use std::path::PathBuf; use std::sync::Mutex; @@ -177,12 +179,14 @@ fn run( .num_threads(thread_num) .build_global()?; + let buffer = Mutex::new(OutputBuffer::new()); let aastream = aastream.map(Mutex::new); let metastream = metastream.map(Mutex::new); let dnastream = dnastream.map(Mutex::new); Chunked::new(100, fasta::Reader::new(inputseqs).into_records()) + .enumerate() .par_bridge() - .map(|recordvec| { + .map(|(index, recordvec)| { let mut metabuf = Vec::new(); let mut dnabuf = Vec::new(); let mut aabuf = Vec::new(); @@ -207,14 +211,18 @@ fn run( read_prediction.protein(&mut aabuf, whole_genome)?; } } - if let Some(metastream) = &metastream { - metastream.lock().unwrap().write_all(&metabuf)?; - } - if let Some(dnastream) = &dnastream { - dnastream.lock().unwrap().write_all(&dnabuf)?; - } - if let Some(aastream) = &aastream { - aastream.lock().unwrap().write_all(&aabuf)?; + let mut locked_buffer = buffer.lock().unwrap(); + locked_buffer.set(index, (metabuf, dnabuf, aabuf)); + for (metabuf, dnabuf, aabuf) in locked_buffer.deref_mut() { + if let Some(metastream) = &metastream { + metastream.lock().unwrap().write_all(&metabuf)?; + } + if let Some(dnastream) = &dnastream { + dnastream.lock().unwrap().write_all(&dnabuf)?; + } + if let Some(aastream) = &aastream { + aastream.lock().unwrap().write_all(&aabuf)?; + } } Ok(()) }) @@ -251,3 +259,38 @@ impl Iterator for Chunked { } } } + +struct OutputBuffer { + next: usize, + queue: VecDeque>, +} + +impl OutputBuffer { + fn new() -> Self { + OutputBuffer { + next: 0, + queue: VecDeque::new(), + } + } + + fn set(&mut self, index: usize, item: I) { + while self.next + self.queue.len() <= index { + self.queue.push_back(None); + } + self.queue[index - self.next] = Some(item); + } +} + +impl Iterator for OutputBuffer { + type Item = I; + + fn next(&mut self) -> Option { + if self.queue.front().map(Option::is_some).unwrap_or(false) { + let item = self.queue.pop_front().unwrap().unwrap(); + self.next += 1; + Some(item) + } else { + None + } + } +} From 498f4ba2bc90c5856976408de94c8d8433ffb7ae Mon Sep 17 00:00:00 2001 From: Felix Van der Jeugt Date: Tue, 3 Aug 2021 11:53:29 +0200 Subject: [PATCH 2/3] remove obsolete mutexes --- src/bin/FragGeneScanRs.rs | 53 +++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/src/bin/FragGeneScanRs.rs b/src/bin/FragGeneScanRs.rs index 81cf189..cb7b298 100644 --- a/src/bin/FragGeneScanRs.rs +++ b/src/bin/FragGeneScanRs.rs @@ -5,6 +5,7 @@ use std::collections::VecDeque; use std::fs::File; use std::io; use std::io::{Read, Write}; +use std::marker::Sync; use std::ops::DerefMut; use std::path::PathBuf; use std::sync::Mutex; @@ -115,7 +116,7 @@ fn main() -> Result<()> { filename => Box::new(File::open(filename)?), }; - let mut aastream: Option> = match ( + let mut aastream: Option> = match ( matches.value_of("aa-file"), matches.value_of("output-prefix"), ) { @@ -164,7 +165,7 @@ fn main() -> Result<()> { Ok(()) } -fn run( +fn run( global: Box, locals: Vec, inputseqs: R, @@ -179,10 +180,11 @@ fn run( .num_threads(thread_num) .build_global()?; - let buffer = Mutex::new(OutputBuffer::new()); - let aastream = aastream.map(Mutex::new); - let metastream = metastream.map(Mutex::new); - let dnastream = dnastream.map(Mutex::new); + let hasmeta = metastream.is_some(); + let hasdna = metastream.is_some(); + let hasaa = metastream.is_some(); + let output = Mutex::new(OutputBuffer::new(aastream, metastream, dnastream)); + Chunked::new(100, fasta::Reader::new(inputseqs).into_records()) .enumerate() .par_bridge() @@ -201,27 +203,28 @@ fn run( nseq, whole_genome, ); - if metastream.is_some() { + if hasmeta { read_prediction.meta(&mut metabuf)?; } - if dnastream.is_some() { + if hasdna { read_prediction.dna(&mut dnabuf, formatted)?; } - if aastream.is_some() { + if hasaa { read_prediction.protein(&mut aabuf, whole_genome)?; } } - let mut locked_buffer = buffer.lock().unwrap(); - locked_buffer.set(index, (metabuf, dnabuf, aabuf)); - for (metabuf, dnabuf, aabuf) in locked_buffer.deref_mut() { - if let Some(metastream) = &metastream { - metastream.lock().unwrap().write_all(&metabuf)?; + let mut buffer = output.lock().unwrap(); + buffer.set(index, (metabuf, dnabuf, aabuf)); + let bufs = buffer.deref_mut().collect::, Vec, Vec)>>(); + for (metabuf, dnabuf, aabuf) in bufs { + if let Some(metastream) = &mut buffer.metastream { + &mut metastream.write_all(&metabuf)?; } - if let Some(dnastream) = &dnastream { - dnastream.lock().unwrap().write_all(&dnabuf)?; + if let Some(dnastream) = &mut buffer.dnastream { + &mut dnastream.write_all(&dnabuf)?; } - if let Some(aastream) = &aastream { - aastream.lock().unwrap().write_all(&aabuf)?; + if let Some(aastream) = &mut buffer.aastream { + &mut aastream.write_all(&aabuf)?; } } Ok(()) @@ -260,16 +263,22 @@ impl Iterator for Chunked { } } -struct OutputBuffer { +struct OutputBuffer { next: usize, queue: VecDeque>, + aastream: Option, + metastream: Option, + dnastream: Option, } -impl OutputBuffer { - fn new() -> Self { +impl OutputBuffer { + fn new(aastream: Option, metastream: Option, dnastream: Option) -> Self { OutputBuffer { next: 0, queue: VecDeque::new(), + aastream: aastream, + metastream: metastream, + dnastream: dnastream, } } @@ -281,7 +290,7 @@ impl OutputBuffer { } } -impl Iterator for OutputBuffer { +impl Iterator for OutputBuffer { type Item = I; fn next(&mut self) -> Option { From 3d7c414ac664936f2bbfc348161c02b77181b91b Mon Sep 17 00:00:00 2001 From: Felix Van der Jeugt Date: Tue, 3 Aug 2021 18:50:39 +0200 Subject: [PATCH 3/3] allow choice between ordered and unordered output --- README.md | 4 + src/bin/FragGeneScanRs.rs | 154 ++++++++++++++++++++++---------------- src/gene.rs | 20 +++++ 3 files changed, 112 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index d382ec6..281d368 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,10 @@ where: * `-r train_file_dir` can change the directory containing the training files, so you can put it anywhere on your system. +* `-u` can be used for some additional speed when using multithreading. The + output will no longer be in the same order as the input (as in FGS and + FGS+). + The complete list of options will be printed when running `FragGeneScanRs --help`. diff --git a/src/bin/FragGeneScanRs.rs b/src/bin/FragGeneScanRs.rs index cb7b298..fa90be5 100644 --- a/src/bin/FragGeneScanRs.rs +++ b/src/bin/FragGeneScanRs.rs @@ -5,8 +5,6 @@ use std::collections::VecDeque; use std::fs::File; use std::io; use std::io::{Read, Write}; -use std::marker::Sync; -use std::ops::DerefMut; use std::path::PathBuf; use std::sync::Mutex; @@ -104,6 +102,10 @@ fn main() -> Result<()> { .value_name("nucleotide_file") .takes_value(true) .help("Output predicted genes to this file (supersedes -o).")) + .arg(Arg::with_name("unordered") + .short("u") + .long("unordered") + .help("Do not preserve record order in output (faster).")) .get_matches(); let (global, locals) = hmm::get_train_from_file( @@ -116,7 +118,7 @@ fn main() -> Result<()> { filename => Box::new(File::open(filename)?), }; - let mut aastream: Option> = match ( + let mut aastream: Option> = match ( matches.value_of("aa-file"), matches.value_of("output-prefix"), ) { @@ -126,23 +128,23 @@ fn main() -> Result<()> { (None, None) => None, }; - let metastream: Option = match ( + let metastream: Option> = match ( matches.value_of("meta-file"), matches.value_of("output-prefix"), ) { - (Some(filename), _) => Some(File::create(filename)?), + (Some(filename), _) => Some(Box::new(File::create(filename)?)), (None, Some("stdout")) => None, - (None, Some(filename)) => Some(File::create(filename.to_owned() + ".out")?), + (None, Some(filename)) => Some(Box::new(File::create(filename.to_owned() + ".out")?)), (None, None) => None, }; - let dnastream: Option = match ( + let dnastream: Option> = match ( matches.value_of("nucleotide-file"), matches.value_of("output-prefix"), ) { - (Some(filename), _) => Some(File::create(filename)?), + (Some(filename), _) => Some(Box::new(File::create(filename)?)), (None, Some("stdout")) => None, - (None, Some(filename)) => Some(File::create(filename.to_owned() + ".ffn")?), + (None, Some(filename)) => Some(Box::new(File::create(filename.to_owned() + ".ffn")?)), (None, None) => None, }; @@ -150,28 +152,42 @@ fn main() -> Result<()> { aastream = Some(Box::new(io::stdout())); } - run( - global, - locals, - inputseqs, - aastream, - metastream, - dnastream, - matches.value_of("complete").unwrap() == "1", - matches.is_present("formatted"), - usize::from_str_radix(matches.value_of("thread-num").unwrap(), 10)?, - )?; + if matches.is_present("unordered") { + run( + global, + locals, + inputseqs, + aastream.map(UnbufferingBuffer::new), + metastream.map(UnbufferingBuffer::new), + dnastream.map(UnbufferingBuffer::new), + matches.value_of("complete").unwrap() == "1", + matches.is_present("formatted"), + usize::from_str_radix(matches.value_of("thread-num").unwrap(), 10)?, + )?; + } else { + run( + global, + locals, + inputseqs, + aastream.map(SortingBuffer::new), + metastream.map(SortingBuffer::new), + dnastream.map(SortingBuffer::new), + matches.value_of("complete").unwrap() == "1", + matches.is_present("formatted"), + usize::from_str_radix(matches.value_of("thread-num").unwrap(), 10)?, + )?; + } Ok(()) } -fn run( +fn run( global: Box, locals: Vec, inputseqs: R, - aastream: Option, - metastream: Option, - dnastream: Option, + aa_buffer: Option, + meta_buffer: Option, + dna_buffer: Option, whole_genome: bool, formatted: bool, thread_num: usize, @@ -180,10 +196,9 @@ fn run( .num_threads(thread_num) .build_global()?; - let hasmeta = metastream.is_some(); - let hasdna = metastream.is_some(); - let hasaa = metastream.is_some(); - let output = Mutex::new(OutputBuffer::new(aastream, metastream, dnastream)); + let meta_buffer = meta_buffer.map(Mutex::new); + let dna_buffer = dna_buffer.map(Mutex::new); + let aa_buffer = aa_buffer.map(Mutex::new); Chunked::new(100, fasta::Reader::new(inputseqs).into_records()) .enumerate() @@ -203,29 +218,24 @@ fn run( nseq, whole_genome, ); - if hasmeta { + if meta_buffer.is_some() { read_prediction.meta(&mut metabuf)?; } - if hasdna { + if dna_buffer.is_some() { read_prediction.dna(&mut dnabuf, formatted)?; } - if hasaa { + if aa_buffer.is_some() { read_prediction.protein(&mut aabuf, whole_genome)?; } } - let mut buffer = output.lock().unwrap(); - buffer.set(index, (metabuf, dnabuf, aabuf)); - let bufs = buffer.deref_mut().collect::, Vec, Vec)>>(); - for (metabuf, dnabuf, aabuf) in bufs { - if let Some(metastream) = &mut buffer.metastream { - &mut metastream.write_all(&metabuf)?; - } - if let Some(dnastream) = &mut buffer.dnastream { - &mut dnastream.write_all(&dnabuf)?; - } - if let Some(aastream) = &mut buffer.aastream { - &mut aastream.write_all(&aabuf)?; - } + if let Some(buffer) = &meta_buffer { + buffer.lock().unwrap().add(index, metabuf)?; + } + if let Some(buffer) = &dna_buffer { + buffer.lock().unwrap().add(index, dnabuf)?; + } + if let Some(buffer) = &aa_buffer { + buffer.lock().unwrap().add(index, aabuf)?; } Ok(()) }) @@ -263,43 +273,55 @@ impl Iterator for Chunked { } } -struct OutputBuffer { +trait WritingBuffer { + fn add(&mut self, index: usize, item: Vec) -> Result<()>; +} + +struct SortingBuffer { next: usize, - queue: VecDeque>, - aastream: Option, - metastream: Option, - dnastream: Option, + queue: VecDeque>>, + stream: W, } -impl OutputBuffer { - fn new(aastream: Option, metastream: Option, dnastream: Option) -> Self { - OutputBuffer { +impl SortingBuffer { + fn new(stream: W) -> Self { + SortingBuffer { next: 0, queue: VecDeque::new(), - aastream: aastream, - metastream: metastream, - dnastream: dnastream, + stream: stream, } } +} - fn set(&mut self, index: usize, item: I) { +impl WritingBuffer for SortingBuffer { + fn add(&mut self, index: usize, item: Vec) -> Result<()> { while self.next + self.queue.len() <= index { self.queue.push_back(None); } self.queue[index - self.next] = Some(item); - } -} - -impl Iterator for OutputBuffer { - type Item = I; - fn next(&mut self) -> Option { - if self.queue.front().map(Option::is_some).unwrap_or(false) { + while self.queue.front().map(Option::is_some).unwrap_or(false) { let item = self.queue.pop_front().unwrap().unwrap(); self.next += 1; - Some(item) - } else { - None + self.stream.write_all(&item)?; } + Ok(()) + } +} + +struct UnbufferingBuffer { + stream: W, +} + +impl UnbufferingBuffer { + fn new(stream: W) -> Self { + UnbufferingBuffer { stream } + } +} + +impl WritingBuffer for UnbufferingBuffer { + fn add(&mut self, _: usize, item: Vec) -> Result<()> { + self.stream.write_all(&item)?; + Ok(()) } } diff --git a/src/gene.rs b/src/gene.rs index 972c9bb..db76181 100644 --- a/src/gene.rs +++ b/src/gene.rs @@ -17,6 +17,26 @@ impl ReadPrediction { } } + pub fn append_to( + &self, + aabuf: &mut Option>, + metabuf: &mut Option>, + dnabuf: &mut Option>, + formatted: bool, + whole_genome: bool, + ) -> Result<(), GeneError> { + if let Some(metabuf) = metabuf { + self.meta(&mut *metabuf)?; + } + if let Some(dnabuf) = dnabuf { + self.dna(&mut *dnabuf, formatted)?; + } + if let Some(aabuf) = aabuf { + self.protein(&mut *aabuf, whole_genome)?; + } + Ok(()) + } + pub fn meta(&self, buf: &mut Vec) -> Result<(), GeneError> { if !self.genes.is_empty() { buf.append(&mut format!(">{}\n", std::str::from_utf8(&self.head)?).into_bytes())