Skip to content

Commit

Permalink
Move all worker logic into one reusable worker
Browse files Browse the repository at this point in the history
  • Loading branch information
pverscha committed Oct 2, 2020
1 parent 73c7b85 commit 94185e9
Show file tree
Hide file tree
Showing 21 changed files with 173 additions and 165 deletions.
39 changes: 24 additions & 15 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"regenerator-runtime": "^0.13.3",
"shared-memory-datastructures": "0.1.8",
"threads": "^1.4.1",
"unipept-web-components": "1.2.2",
"unipept-web-components": "file:unipept-web-components-1.2.2.tgz",
"uuid": "^7.0.3",
"vue": "^2.6.12",
"vue-class-component": "^7.1.0",
Expand Down Expand Up @@ -92,7 +92,7 @@
"vue-cli-plugin-vuetify": "^0.6.3",
"vue-template-compiler": "^2.6.12",
"vuetify-loader": "^1.2.2",
"worker-loader": "^2.0.0"
"worker-loader": "^3.0.2"
},
"eslintConfig": {
"root": true,
Expand Down
3 changes: 2 additions & 1 deletion src/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import ConfigurationManager from "./logic/configuration/ConfigurationManager";
import Configuration from "./logic/configuration/Configuration";
import ErrorListener from "@/logic/filesystem/ErrorListener";
const electron = require("electron");
import Worker from "worker-loader?inline=fallback!./logic/system/DesktopWorker.worker";
import {
Assay,
ProteomicsAssay,
Expand Down Expand Up @@ -204,7 +205,7 @@ export default class App extends Vue implements ErrorListener {
let config: Configuration = await configurationManager.readConfiguration();
NetworkConfiguration.BASE_URL = config.apiSource;
NetworkConfiguration.PARALLEL_API_REQUESTS = config.maxParallelRequests;
QueueManager.initializeQueue(config.maxLongRunningTasks);
QueueManager.initializeQueue(config.maxLongRunningTasks, () => new Worker());
await this.$store.dispatch("setUseNativeTitlebar", config.useNativeTitlebar);
} catch (err) {
// TODO: show a proper error message to the user in case this happens
Expand Down
2 changes: 1 addition & 1 deletion src/background.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { autoUpdater } from "electron-updater";
import log from "electron-log";
import installExtension, { VUEJS_DEVTOOLS } from "electron-devtools-installer"

app.commandLine.appendSwitch("js-flags", "--max-old-space-size=4096");
app.commandLine.appendSwitch("js-flags", "--max-old-space-size=4096 --expose_gc");

const isDevelopment = process.env.NODE_ENV !== "production"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { EcCode, EcResponseCommunicator, EcResponse } from "unipept-web-components";
import { spawn, Worker } from "threads/dist";
import { EcCode, EcResponseCommunicator, EcResponse, QueueManager } from "unipept-web-components";
import StaticDatabaseManager from "@/logic/communication/static/StaticDatabaseManager";

export default class CachedEcResponseCommunicator extends EcResponseCommunicator {
Expand Down Expand Up @@ -29,17 +28,14 @@ export default class CachedEcResponseCommunicator extends EcResponseCommunicator
await CachedEcResponseCommunicator.processing;
}

if (!CachedEcResponseCommunicator.worker) {
CachedEcResponseCommunicator.worker = await spawn(
new Worker("./CachedEcResponseCommunicator.worker.ts")
);
}
CachedEcResponseCommunicator.processing = CachedEcResponseCommunicator.worker.process(
CachedEcResponseCommunicator.processing = QueueManager.getLongRunningQueue().pushTask<
Map<EcCode, EcResponse>, [string, string, EcCode[], Map<EcCode, EcResponse>]
>("computeCachedEcResponses", [
__dirname,
this.dbFile,
codes,
CachedEcResponseCommunicator.codeToResponses
);
]);

CachedEcResponseCommunicator.codeToResponses = await CachedEcResponseCommunicator.processing;
CachedEcResponseCommunicator.processing = undefined;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import { EcResponse } from "unipept-web-components/src/business/communication/functional/ec/EcResponse";
import { EcCode } from "unipept-web-components/src/business/ontology/functional/ec/EcDefinition";
import { expose } from "threads/worker";
import { EcResponse, EcCode } from "unipept-web-components";
import Database from "better-sqlite3";

expose({ process })

export default function process(
installationDir: string,
dbPath: string,
codes: EcCode[],
output: Map<EcCode, EcResponse>
): Map<EcCode, EcResponse> {
export async function compute([
installationDir,
dbPath,
codes,
output
]: [string, string, EcCode[], Map<EcCode, EcResponse>]): Promise<Map<EcCode, EcResponse>> {
// @ts-ignore
const db = new Database(dbPath, {}, installationDir);
const stmt = db.prepare("SELECT * FROM ec_numbers WHERE `code` = ?");
Expand Down
15 changes: 5 additions & 10 deletions src/logic/communication/functional/CachedGoResponseCommunicator.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { GoResponse, GoResponseCommunicator, GoCode } from "unipept-web-components";
import { GoResponse, GoResponseCommunicator, GoCode, QueueManager } from "unipept-web-components";
import StaticDatabaseManager from "@/logic/communication/static/StaticDatabaseManager";
import { spawn, Worker } from "threads/dist";

export default class CachedGoResponseCommunicator extends GoResponseCommunicator {
private static codeToResponses: Map<GoCode, GoResponse> = new Map<GoCode, GoResponse>();
private static processing: Promise<Map<GoCode, GoResponse>>;
private static worker: any;
private readonly dbFile: string;

constructor() {
Expand All @@ -29,17 +27,14 @@ export default class CachedGoResponseCommunicator extends GoResponseCommunicator
await CachedGoResponseCommunicator.processing;
}

if (!CachedGoResponseCommunicator.worker) {
CachedGoResponseCommunicator.worker = await spawn(
new Worker("./CachedGoResponseCommunicator.worker.ts")
);
}
CachedGoResponseCommunicator.processing = CachedGoResponseCommunicator.worker.process(
CachedGoResponseCommunicator.processing = QueueManager.getLongRunningQueue().pushTask<
Map<string, GoResponse>, [string, string, GoCode[], Map<GoCode, GoResponse>]
>("computeCachedGoResponses", [
__dirname,
this.dbFile,
codes,
CachedGoResponseCommunicator.codeToResponses
);
]);

CachedGoResponseCommunicator.codeToResponses = await CachedGoResponseCommunicator.processing;
CachedGoResponseCommunicator.processing = undefined;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import { expose } from "threads/worker";
import Database from "better-sqlite3";
import { GoResponse } from "unipept-web-components/src/business/communication/functional/go/GoResponse";
import { GoCode } from "unipept-web-components/src/business/ontology/functional/go/GoDefinition";

expose({ process })

export default function process(
installationDir: string,
dbPath: string,
codes: GoCode[],
output: Map<GoCode, GoResponse>
): Map<GoCode, GoResponse> {
export async function compute(
[installationDir, dbPath, codes, output]: [string, string, GoCode[], Map<GoCode, GoResponse>]
): Promise<Map<GoCode, GoResponse>> {
// @ts-ignore
const db = new Database(dbPath, {}, installationDir);
const stmt = db.prepare("SELECT * FROM go_terms WHERE `code` = ?");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { InterproCode, InterproResponse, InterproResponseCommunicator } from "unipept-web-components";
import { InterproCode, InterproResponse, InterproResponseCommunicator, QueueManager } from "unipept-web-components";
import StaticDatabaseManager from "@/logic/communication/static/StaticDatabaseManager";
import { spawn, Worker } from "threads/dist";

export default class CachedInterproResponseCommunicator extends InterproResponseCommunicator {
private static codeToResponses: Map<InterproCode, InterproResponse> = new Map<InterproCode, InterproResponse>();
Expand Down Expand Up @@ -29,16 +28,16 @@ export default class CachedInterproResponseCommunicator extends InterproResponse
await CachedInterproResponseCommunicator.processing;
}

if (!CachedInterproResponseCommunicator.worker) {
CachedInterproResponseCommunicator.worker = await spawn(
new Worker("./CachedInterproResponseCommunicator.worker.ts")
);
}
CachedInterproResponseCommunicator.processing = CachedInterproResponseCommunicator.worker.process(
__dirname,
this.dbFile,
codes,
CachedInterproResponseCommunicator.codeToResponses
CachedInterproResponseCommunicator.processing = QueueManager.getLongRunningQueue().pushTask<
Map<InterproCode, InterproResponse>, [string, string, InterproCode[], Map<InterproCode, InterproResponse>]
>(
"computeCachedInterproResponses",
[
__dirname,
this.dbFile,
codes,
CachedInterproResponseCommunicator.codeToResponses
]
);

CachedInterproResponseCommunicator.codeToResponses = await CachedInterproResponseCommunicator.processing;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import { expose } from "threads/worker";
import Database from "better-sqlite3";
import InterproResponse from "unipept-web-components/src/business/communication/functional/interpro/InterproResponse";
import { InterproCode } from "unipept-web-components/src/business/ontology/functional/interpro/InterproDefinition";
import { EcResponse } from "unipept-web-components/src/business/communication/functional/ec/EcResponse";
import { EcCode } from "unipept-web-components/src/business/ontology/functional/ec/EcDefinition";

expose({ process })

export default function process(
installationDir: string,
dbPath: string,
codes: InterproCode[],
output: Map<InterproCode, InterproResponse>
): Map<EcCode, EcResponse> {
export async function compute(
[installationDir, dbPath, codes, output]: [string, string, InterproCode[], Map<InterproCode, InterproResponse>]
): Promise<Map<EcCode, EcResponse>> {
// @ts-ignore
const db = new Database(dbPath, {}, installationDir);
const stmt = db.prepare("SELECT * FROM interpro_entries WHERE `code` = ?");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { NcbiResponseCommunicator, NcbiId, NcbiResponse } from "unipept-web-components";
import { NcbiResponseCommunicator, NcbiId, NcbiResponse, QueueManager } from "unipept-web-components";
import StaticDatabaseManager from "@/logic/communication/static/StaticDatabaseManager";
import { Database, Statement } from "better-sqlite3";
import { spawn, Worker } from "threads/dist";

export default class CachedNcbiResponseCommunicator extends NcbiResponseCommunicator {
private readonly database: Database;
private fallbackCommunicator: NcbiResponseCommunicator;
private readonly extractStmt: Statement;
private static codesProcessed: Map<NcbiId, NcbiResponse> = new Map<NcbiId, NcbiResponse>();
private static processing: Promise<Map<NcbiId, NcbiResponse>>;
private static worker: any;

constructor() {
super();
Expand All @@ -33,17 +31,19 @@ export default class CachedNcbiResponseCommunicator extends NcbiResponseCommunic

const staticDatabaseManager = new StaticDatabaseManager();

if (!CachedNcbiResponseCommunicator.worker) {
CachedNcbiResponseCommunicator.worker = await spawn(
new Worker("./CachedNcbiResponseCommunicator.worker.ts")
);
}
CachedNcbiResponseCommunicator.processing = CachedNcbiResponseCommunicator.worker.process(
__dirname,
staticDatabaseManager.getDatabasePath(),
codes,
CachedNcbiResponseCommunicator.codesProcessed
CachedNcbiResponseCommunicator.processing = QueueManager.getLongRunningQueue().pushTask<
Map<NcbiId, NcbiResponse>,
[string, string, NcbiId[], Map<NcbiId, NcbiResponse>]
>(
"computeCachedNcbiResponses",
[
__dirname,
staticDatabaseManager.getDatabasePath(),
codes,
CachedNcbiResponseCommunicator.codesProcessed
]
);

CachedNcbiResponseCommunicator.codesProcessed = await CachedNcbiResponseCommunicator.processing;
CachedNcbiResponseCommunicator.processing = undefined;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import NcbiResponse from "unipept-web-components/src/business/communication/taxonomic/ncbi/NcbiResponse";
import { NcbiId } from "unipept-web-components/src/business/ontology/taxonomic/ncbi/NcbiTaxon";
import { NcbiRank } from "unipept-web-components/src/business/ontology/taxonomic/ncbi/NcbiRank";
import { expose } from "threads/worker";
import Database from "better-sqlite3";

expose({ process })

export default function process(installationDir: string, dbPath: string, ncbiIds: NcbiId[], output: Map<NcbiId, NcbiResponse>): Map<NcbiId, NcbiResponse> {
export async function compute(
[installationDir, dbPath, ncbiIds, output]: [string, string, NcbiId[], Map<NcbiId, NcbiResponse>]
): Promise<Map<NcbiId, NcbiResponse>> {
// @ts-ignore
const database = new Database(dbPath, {}, installationDir);

Expand Down
17 changes: 5 additions & 12 deletions src/logic/filesystem/assay/AssayFileSystemDataReader.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
import FileSystemAssayVisitor from "@/logic/filesystem/assay/FileSystemAssayVisitor";
import { ProteomicsAssay, IOException } from "unipept-web-components";
import { ProteomicsAssay, IOException, QueueManager } from "unipept-web-components";
import { promises as fs } from "fs";
import { spawn, Worker } from "threads"
import { Database } from "better-sqlite3";

export default class AssayFileSystemDataReader extends FileSystemAssayVisitor {
private static worker: any;

public async visitProteomicsAssay(mpAssay: ProteomicsAssay): Promise<void> {
const path: string = `${this.directoryPath}${mpAssay.getName()}.pep`;

try {
const start = new Date().getTime();
const peptidesString: string = await fs.readFile(path, {
encoding: "utf-8"
});

if (!AssayFileSystemDataReader.worker) {
AssayFileSystemDataReader.worker = await spawn(new Worker("./AssayFileSystemDataReader.worker.ts"));
}
const splitted = await QueueManager.getLongRunningQueue().pushTask<string[], string>(
"readAssay",
peptidesString
);

const splitted = await AssayFileSystemDataReader.worker(peptidesString);
const end = new Date().getTime();
console.log("Reading in peptides from file: " + (end - start) / 1000 + "s");
mpAssay.setPeptides(splitted);
} catch (err) {
// The file does not exist (yet), throw an exception
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { expose } from "threads/worker";
import { Peptide } from "unipept-web-components/src/business/ontology/raw/Peptide";

expose(readAssay)

async function readAssay(peptidesString: string): Promise<Peptide[]> {
export async function compute(peptidesString: string): Promise<Peptide[]> {
const output = [];
let terminatorPos = peptidesString.indexOf("\n");
let previousTerminatorPos = 0;
Expand Down
Loading

0 comments on commit 94185e9

Please sign in to comment.