Skip to content

Commit

Permalink
Merge branch 'in-order-output'
Browse files Browse the repository at this point in the history
* in-order-output:
  allow choice between ordered and unordered output
  remove obsolete mutexes
  implement OutputBuffer for in-order output
  • Loading branch information
Felix Van der Jeugt committed Aug 4, 2021
2 parents 76e2bc3 + 3d7c414 commit 3a281cc
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "frag_gene_scan_rs"
version = "0.3.2"
version = "0.4.0"
authors = ["Felix Van der Jeugt <[email protected]>"]
edition = "2018"

Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ where:
* `-r train_file_dir` can change the directory containing the training
files, so you can put it anywhere on your system.

* `-u` can be used for some additional speed when using multithreading. The
output will no longer be in the same order as the input (as in FGS and
FGS+).

The complete list of options will be printed when running
`FragGeneScanRs --help`.

Expand Down
142 changes: 108 additions & 34 deletions src/bin/FragGeneScanRs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! FragGeneScanRs executable
#![allow(non_snake_case)]

use std::collections::VecDeque;
use std::fs::File;
use std::io;
use std::io::{Read, Write};
Expand Down Expand Up @@ -101,6 +102,10 @@ fn main() -> Result<()> {
.value_name("nucleotide_file")
.takes_value(true)
.help("Output predicted genes to this file (supersedes -o)."))
.arg(Arg::with_name("unordered")
.short("u")
.long("unordered")
.help("Do not preserve record order in output (faster)."))
.get_matches();

let (global, locals) = hmm::get_train_from_file(
Expand All @@ -123,52 +128,66 @@ fn main() -> Result<()> {
(None, None) => None,
};

let metastream: Option<File> = match (
let metastream: Option<Box<dyn Write + Send>> = match (
matches.value_of("meta-file"),
matches.value_of("output-prefix"),
) {
(Some(filename), _) => Some(File::create(filename)?),
(Some(filename), _) => Some(Box::new(File::create(filename)?)),
(None, Some("stdout")) => None,
(None, Some(filename)) => Some(File::create(filename.to_owned() + ".out")?),
(None, Some(filename)) => Some(Box::new(File::create(filename.to_owned() + ".out")?)),
(None, None) => None,
};

let dnastream: Option<File> = match (
let dnastream: Option<Box<dyn Write + Send>> = match (
matches.value_of("nucleotide-file"),
matches.value_of("output-prefix"),
) {
(Some(filename), _) => Some(File::create(filename)?),
(Some(filename), _) => Some(Box::new(File::create(filename)?)),
(None, Some("stdout")) => None,
(None, Some(filename)) => Some(File::create(filename.to_owned() + ".ffn")?),
(None, Some(filename)) => Some(Box::new(File::create(filename.to_owned() + ".ffn")?)),
(None, None) => None,
};

if aastream.is_none() && metastream.is_none() && dnastream.is_none() {
aastream = Some(Box::new(io::stdout()));
}

run(
global,
locals,
inputseqs,
aastream,
metastream,
dnastream,
matches.value_of("complete").unwrap() == "1",
matches.is_present("formatted"),
usize::from_str_radix(matches.value_of("thread-num").unwrap(), 10)?,
)?;
if matches.is_present("unordered") {
run(
global,
locals,
inputseqs,
aastream.map(UnbufferingBuffer::new),
metastream.map(UnbufferingBuffer::new),
dnastream.map(UnbufferingBuffer::new),
matches.value_of("complete").unwrap() == "1",
matches.is_present("formatted"),
usize::from_str_radix(matches.value_of("thread-num").unwrap(), 10)?,
)?;
} else {
run(
global,
locals,
inputseqs,
aastream.map(SortingBuffer::new),
metastream.map(SortingBuffer::new),
dnastream.map(SortingBuffer::new),
matches.value_of("complete").unwrap() == "1",
matches.is_present("formatted"),
usize::from_str_radix(matches.value_of("thread-num").unwrap(), 10)?,
)?;
}

Ok(())
}

fn run<R: Read + Send, W: Write + Send>(
fn run<R: Read + Send, W: WritingBuffer + Send>(
global: Box<hmm::Global>,
locals: Vec<hmm::Local>,
inputseqs: R,
aastream: Option<W>,
metastream: Option<File>,
dnastream: Option<File>,
aa_buffer: Option<W>,
meta_buffer: Option<W>,
dna_buffer: Option<W>,
whole_genome: bool,
formatted: bool,
thread_num: usize,
Expand All @@ -177,12 +196,14 @@ fn run<R: Read + Send, W: Write + Send>(
.num_threads(thread_num)
.build_global()?;

let aastream = aastream.map(Mutex::new);
let metastream = metastream.map(Mutex::new);
let dnastream = dnastream.map(Mutex::new);
let meta_buffer = meta_buffer.map(Mutex::new);
let dna_buffer = dna_buffer.map(Mutex::new);
let aa_buffer = aa_buffer.map(Mutex::new);

Chunked::new(100, fasta::Reader::new(inputseqs).into_records())
.enumerate()
.par_bridge()
.map(|recordvec| {
.map(|(index, recordvec)| {
let mut metabuf = Vec::new();
let mut dnabuf = Vec::new();
let mut aabuf = Vec::new();
Expand All @@ -197,24 +218,24 @@ fn run<R: Read + Send, W: Write + Send>(
nseq,
whole_genome,
);
if metastream.is_some() {
if meta_buffer.is_some() {
read_prediction.meta(&mut metabuf)?;
}
if dnastream.is_some() {
if dna_buffer.is_some() {
read_prediction.dna(&mut dnabuf, formatted)?;
}
if aastream.is_some() {
if aa_buffer.is_some() {
read_prediction.protein(&mut aabuf, whole_genome)?;
}
}
if let Some(metastream) = &metastream {
metastream.lock().unwrap().write_all(&metabuf)?;
if let Some(buffer) = &meta_buffer {
buffer.lock().unwrap().add(index, metabuf)?;
}
if let Some(dnastream) = &dnastream {
dnastream.lock().unwrap().write_all(&dnabuf)?;
if let Some(buffer) = &dna_buffer {
buffer.lock().unwrap().add(index, dnabuf)?;
}
if let Some(aastream) = &aastream {
aastream.lock().unwrap().write_all(&aabuf)?;
if let Some(buffer) = &aa_buffer {
buffer.lock().unwrap().add(index, aabuf)?;
}
Ok(())
})
Expand Down Expand Up @@ -251,3 +272,56 @@ impl<I: Iterator> Iterator for Chunked<I> {
}
}
}

trait WritingBuffer {
fn add(&mut self, index: usize, item: Vec<u8>) -> Result<()>;
}

struct SortingBuffer<W: Write + Send> {
next: usize,
queue: VecDeque<Option<Vec<u8>>>,
stream: W,
}

impl<W: Write + Send> SortingBuffer<W> {
fn new(stream: W) -> Self {
SortingBuffer {
next: 0,
queue: VecDeque::new(),
stream: stream,
}
}
}

impl<W: Write + Send> WritingBuffer for SortingBuffer<W> {
fn add(&mut self, index: usize, item: Vec<u8>) -> Result<()> {
while self.next + self.queue.len() <= index {
self.queue.push_back(None);
}
self.queue[index - self.next] = Some(item);

while self.queue.front().map(Option::is_some).unwrap_or(false) {
let item = self.queue.pop_front().unwrap().unwrap();
self.next += 1;
self.stream.write_all(&item)?;
}
Ok(())
}
}

struct UnbufferingBuffer<W: Write + Send> {
stream: W,
}

impl<W: Write + Send> UnbufferingBuffer<W> {
fn new(stream: W) -> Self {
UnbufferingBuffer { stream }
}
}

impl<W: Write + Send> WritingBuffer for UnbufferingBuffer<W> {
fn add(&mut self, _: usize, item: Vec<u8>) -> Result<()> {
self.stream.write_all(&item)?;
Ok(())
}
}
20 changes: 20 additions & 0 deletions src/gene.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,26 @@ impl ReadPrediction {
}
}

pub fn append_to(
&self,
aabuf: &mut Option<Vec<u8>>,
metabuf: &mut Option<Vec<u8>>,
dnabuf: &mut Option<Vec<u8>>,
formatted: bool,
whole_genome: bool,
) -> Result<(), GeneError> {
if let Some(metabuf) = metabuf {
self.meta(&mut *metabuf)?;
}
if let Some(dnabuf) = dnabuf {
self.dna(&mut *dnabuf, formatted)?;
}
if let Some(aabuf) = aabuf {
self.protein(&mut *aabuf, whole_genome)?;
}
Ok(())
}

pub fn meta(&self, buf: &mut Vec<u8>) -> Result<(), GeneError> {
if !self.genes.is_empty() {
buf.append(&mut format!(">{}\n", std::str::from_utf8(&self.head)?).into_bytes())
Expand Down

0 comments on commit 3a281cc

Please sign in to comment.