Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-write WriteToChunk and TaxaByChunk in Rust #38

Merged
merged 4 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions scripts/helper_scripts/unipept-database-rs/src/bin/taxa-by-chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::fs::{read_dir, File};
use std::io::{BufRead, BufWriter, Write};
use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
use clap::Parser;
use regex::Regex;

use unipept_database::utils::files::open_sin;

fn main() -> Result<()> {
let args = Cli::parse();

let mut all_taxa: Vec<u64> = Vec::new();

let reader = open_sin();

// Read all taxa ids from stdin
for line in reader.lines() {
let line = line.context("Error reading line from stdin")?;

// Ignore empty lines
if line.trim().is_empty() {
continue;
}

let taxa_id: u64 = line
.trim()
.parse()
.with_context(|| format!("Error parsing {line} as an integer"))?;
all_taxa.push(taxa_id);
}

let chunk_file_regex = Regex::new(r"unipept\..*\.gz").context("Error creating regex")?;

for entry in read_dir(&args.chunk_dir).context("Error reading chunk directory")? {
let entry = entry.context("Error reading entry from chunk directory")?;
let path = entry.path();
if !path.is_file() {
continue;
}

let base_name = match path.file_name() {
None => {
continue;
}
Some(n) => n.to_str().context("Error creating string from file path")?,
};

if !chunk_file_regex.is_match(base_name) {
continue;
}

// Parse the taxa range out of the filename
let replaced_name = base_name.replace("unipept.", "").replace(".chunk.gz", "");
let range = replaced_name.split_once('-');
let range = range.with_context(|| format!("Unable to split {replaced_name} on '-'"))?;
let start: u64 = range
.0
.parse()
.with_context(|| format!("Error parsing {} as an integer", range.0))?;
let end: u64 = range
.1
.parse()
.with_context(|| format!("Error parsing {} as an integer", range.1))?;

let matching_taxa: Vec<&u64> = all_taxa
.iter()
.filter(|&t| start <= *t && *t <= end)
.collect();

// Write matches to a temporary output file
if !matching_taxa.is_empty() {
let mapped_taxa: Vec<String> =
matching_taxa.iter().map(|&t| format!("\t{t}$")).collect();
let joined_taxa = mapped_taxa.join("\n");

let temp_file_path = Path::new(&args.temp_dir).join(format!("{base_name}.pattern"));
let temp_file =
File::create(&temp_file_path).context("Error creating temporary pattern file")?;
let mut writer = BufWriter::new(temp_file);
write!(&mut writer, "{joined_taxa}",)
.context("Error writing to temporary pattern file")?;

// The two unwraps here can't be handled using the ? operator
println!("{}", temp_file_path.into_os_string().into_string().unwrap());
println!("{}", path.into_os_string().into_string().unwrap());
}
}

Ok(())
}

#[derive(Parser, Debug)]
struct Cli {
#[clap(long)]
chunk_dir: PathBuf,

#[clap(long)]
temp_dir: PathBuf,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::fs::File;
use std::io::{BufRead, BufWriter, Write};
use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
use clap::Parser;

use unipept_database::utils::files::open_sin;

fn main() -> Result<()> {
let args = Cli::parse();

let mut file_streams: Vec<BufWriter<File>> = Vec::with_capacity(TAXA_BOUNDS.len());

// Create writers for the output files
for (idx, bound) in TAXA_BOUNDS.iter().take(TAXA_BOUNDS.len() - 1).enumerate() {
let next = TAXA_BOUNDS[idx + 1];
let file_name = format!("unipept.{bound}-{next}.chunk");
let file_path = Path::new(&args.output_dir).join(file_name);
let file_handler = File::create(file_path)
.with_context(|| format!("Unable to create output file {bound}-{next}"))?;
let writer = BufWriter::new(file_handler);
file_streams.push(writer);
}

let mut reader = open_sin();

// First read the header
let mut header: String = String::new();
reader
.read_line(&mut header)
.context("Error reading header from stdin")?;
write_header(&args.output_dir, header)?;

// Then the rest of the data
for line in reader.lines() {
let line = line.context("Error reading line from stdin")?;

if args.verbose {
eprintln!("INFO VERBOSE: writing line to chunk: {line}");
}

let spl: Vec<&str> = line.split('\t').collect();
let taxon_id = spl[8]
.trim()
.parse::<usize>()
.with_context(|| format!("Error parsing {} as an integer", spl[8]))?;

// Find the index of this taxon id in the array
// Note that this can be sped up using binary search (see Python's bisect.bisect_left),
// but this tool is near-instant so we favour readability
let mut index: usize = 0;
while taxon_id > TAXA_BOUNDS[index] {
index += 1;
}

writeln!(&mut file_streams[index - 1], "{line}").context("Error writing to output file")?;
}

Ok(())
}

#[derive(Parser, Debug)]
struct Cli {
#[clap(short, long)]
output_dir: PathBuf,
#[clap(short, long)]
verbose: bool,
}

const TAXA_BOUNDS: [usize; 45] = [
0, 550, 1352, 3047, 5580, 8663, 11676, 32473, 40214, 52774, 66656, 86630, 116960, 162147,
210225, 267979, 334819, 408172, 470868, 570509, 673318, 881260, 1046115, 1136135, 1227077,
1300307, 1410620, 1519492, 1650438, 1756149, 1820614, 1871070, 1898104, 1922217, 1978231,
2024617, 2026757, 2035430, 2070414, 2202732, 2382165, 2527964, 2601669, 2706029, 10000000,
];

fn write_header(output_dir: &PathBuf, header: String) -> Result<()> {
let file_path = Path::new(output_dir).join("db.header");
let file_handler = File::create(file_path).context("Unable to create header output file")?;
let mut writer = BufWriter::new(file_handler);

write!(&mut writer, "{}", header).context("Error writing header")?;

Ok(())
}