Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize implementation #181

Merged
merged 7 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 105 additions & 9 deletions lib/commands/unipept/unipept_subcommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { createInterface } from "node:readline";
import { Interface } from "readline";
import { Formatter } from "../../formatters/formatter.js";
import { FormatterFactory } from "../../formatters/formatter_factory.js";
import { CSVFormatter } from "../../formatters/csv_formatter.js";
import path from "path";
import os from "os";
import { appendFile, mkdir } from "fs/promises";

export abstract class UnipeptSubcommand {
public command: Command;
Expand Down Expand Up @@ -58,9 +62,12 @@ export abstract class UnipeptSubcommand {
this.host = this.getHost();
this.url = `${this.host}/api/v2/${this.name}.json`;
this.formatter = FormatterFactory.getFormatter(this.options.format);

if (this.options.output) {
this.outputStream = createWriteStream(this.options.output);
} else {
// if we write to stdout, we need to handle the EPIPE error
// this happens when the output is piped to another command that stops reading
process.stdout.on("error", (err) => {
if (err.code === "EPIPE") {
process.exit(0);
Expand All @@ -71,6 +78,7 @@ export abstract class UnipeptSubcommand {
const iterator = this.getInputIterator(args, options.input as string);
const firstLine = (await iterator.next()).value;
if (this.command.name() === "taxa2lca") {
// this subcommand is an exception where the entire input is read before processing
await this.simpleInputProcessor(firstLine, iterator);
} else if (firstLine.startsWith(">")) {
this.fasta = true;
Expand All @@ -83,15 +91,29 @@ export abstract class UnipeptSubcommand {
async processBatch(slice: string[], fastaMapper?: { [key: string]: string }): Promise<void> {
if (!this.formatter) throw new Error("Formatter not set");

const r = await fetch(this.url as string, {
method: "POST",
body: this.constructRequestBody(slice),
headers: {
"Accept-Encoding": "gzip",
"User-Agent": this.user_agent,
}
});
const result = await r.json();
let r;
try {
r = await this.fetchWithRetry(this.url as string, {
method: "POST",
body: this.constructRequestBody(slice),
headers: {
"Accept-Encoding": "gzip",
"User-Agent": this.user_agent,
}
});
} catch (e) {
await this.saveError(e as string);
return;
}

let result;
try {
result = await r.json();
} catch (e) {
result = [];
}
if (Array.isArray(result) && result.length === 0) return;
result = this.filterResult(result);

if (this.firstBatch && this.options.header) {
this.outputStream.write(this.formatter.header(result, this.fasta));
Expand All @@ -102,6 +124,31 @@ export abstract class UnipeptSubcommand {
if (this.firstBatch) this.firstBatch = false;
}

/**
* Filter the result based on the selected fields
*/
filterResult(result: unknown): object[] {
if (!Array.isArray(result)) {
result = [result];
}
if (this.formatter && this.formatter instanceof CSVFormatter) {
result = this.formatter.flatten(result as { [key: string]: unknown }[]);
}
if (this.getSelectedFields().length > 0) {
(result as { [key: string]: string }[]).forEach(entry => {
for (const key of Object.keys(entry)) {
if (!this.getSelectedFields().some(regex => regex.test(key))) {
delete entry[key];
}
}
});
}
return result as object[];
}

/**
* Reads batchSize lines from the input and processes them
*/
async normalInputProcessor(firstLine: string, iterator: IterableIterator<string> | AsyncIterableIterator<string>) {
let slice = [firstLine];

Expand All @@ -115,6 +162,10 @@ export abstract class UnipeptSubcommand {
await this.processBatch(slice);
}

/**
* Reads batchSize lines from the input and processes them,
* but takes into account the fasta headers.
*/
async fastaInputProcessor(firstLine: string, iterator: IterableIterator<string> | AsyncIterableIterator<string>) {
let currentFastaHeader = firstLine;
let slice = [];
Expand All @@ -135,6 +186,9 @@ export abstract class UnipeptSubcommand {
await this.processBatch(slice, fastaMapper);
}

/**
* Reads the entire input and processes it in one go
*/
async simpleInputProcessor(firstLine: string, iterator: IterableIterator<string> | AsyncIterableIterator<string>) {
const slice = [firstLine];
for await (const line of iterator) {
Expand All @@ -143,6 +197,43 @@ export abstract class UnipeptSubcommand {
await this.processBatch(slice);
}

/**
* Appends the error message to the log file of today and prints it to the console
*/
async saveError(message: string) {
const errorPath = this.errorFilePath();
mkdir(path.dirname(errorPath), { recursive: true });
await appendFile(errorPath, `${message}\n`);
console.error(`API request failed! log can be found in ${errorPath}`);
}

/**
* Uses fetch to get data from the Unipept API.
* Has a retry mechanism that retries the request up to 5 times with a delay of 0-5 seconds.
* In addition, handles failed requests by returning a rejected promise.
*/
fetchWithRetry(url: string, options: RequestInit, retries = 5): Promise<Response> {
return fetch(url, options)
.then(response => {
if (response.ok) {
return response;
} else {
return Promise.reject(`${response.status} ${response.statusText}`);
}
})
.catch(async error => {
if (retries > 0) {
// retry with delay
// console.error("retrying");
const delay = 5000 * Math.random();
await new Promise(resolve => setTimeout(resolve, delay));
return this.fetchWithRetry(url, options, retries - 1);
} else {
return Promise.reject(`Failed to fetch data from the Unipept API: ${error}`);
}
});
}

private constructRequestBody(slice: string[]): URLSearchParams {
const names = this.getSelectedFields().length === 0 || this.getSelectedFields().some(regex => regex.toString().includes("name") || regex.toString().includes(".*$"));
return new URLSearchParams({
Expand Down Expand Up @@ -173,6 +264,11 @@ export abstract class UnipeptSubcommand {
}
}

private errorFilePath(): string {
const timestamp = new Date().toISOString().split('T')[0];
return path.join(os.homedir(), '.unipept', `unipept-${timestamp}.log`);
}

/**
* Returns an input iterator to use for the request.
* - if arguments are given, use arguments
Expand Down
7 changes: 2 additions & 5 deletions lib/formatters/csv_formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@ import { stringify } from "csv-stringify/sync";
export class CSVFormatter extends Formatter {

header(sampleData: { [key: string]: string }[], fastaMapper?: boolean | undefined): string {
return stringify([this.getKeys(this.flatten(sampleData), fastaMapper)]);
return stringify([this.getKeys(sampleData, fastaMapper)]);
}

footer(): string {
return "";
}

convert(data: object[]): string {
return stringify(this.flatten(data as { [key: string]: unknown }[]));
return stringify(data);
}

getKeys(data: { [key: string]: unknown }[], fastaMapper?: boolean | undefined): string[] {
if (!Array.isArray(data)) {
data = [data];
}
return fastaMapper ? ["fasta_header", ...Object.keys(data[0])] : Object.keys(data[0]);
}

Expand Down
3 changes: 0 additions & 3 deletions lib/formatters/formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ export abstract class Formatter {
if (fastaMapper) {
data = this.integrateFastaHeaders(data as { [key: string]: string }[], fastaMapper);
}
if (!Array.isArray(data)) {
data = [data];
}
return this.convert(data, first);
}

Expand Down
2 changes: 0 additions & 2 deletions tests/formatters/csv_formatter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import { TestObject } from "./test_object";
const formatter = FormatterFactory.getFormatter("csv");

test('test header', () => {
//const fasta = [["peptide", ">test"]];
const object = [TestObject.testObject(), TestObject.testObject()];
expect(formatter.header(object)).toBe(TestObject.asCsvHeader());
//expect(formatter.header(object, fasta)).toBe(`fasta_header,${TestObject.asCsvHeader()}`);
});

test('test footer', () => {
Expand Down
1 change: 0 additions & 1 deletion tests/formatters/formatter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ test('test integrate fasta headers', async () => {
const fasta = { 5: ">test" };
const object = [TestObject.testObject(), TestObject.testObject()];
const integrated = [Object.assign({ fasta_header: ">test" }, TestObject.testObject()), Object.assign({ fasta_header: ">test" }, TestObject.testObject())];
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
expect(formatter.integrateFastaHeaders(object, fasta)).toEqual(integrated);
});