diff --git a/lib/gtfs2connections.js b/lib/gtfs2connections.js index 8224d84..a8b2c04 100644 --- a/lib/gtfs2connections.js +++ b/lib/gtfs2connections.js @@ -1,5 +1,6 @@ const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); const fs = require('fs'); +const zlib = require('zlib'); const path = require('path'); const ChildProcess = require('child_process'); const del = require('del'); @@ -17,7 +18,6 @@ const Connections2Triples = require('./Connections2Triples'); const readdir = util.promisify(fs.readdir); -const appendFile = util.promisify(fs.appendFile); const exec = util.promisify(ChildProcess.exec); const numCPUs = require('os').cpus().length; @@ -61,7 +61,6 @@ Mapper.prototype.resultStream = async function (path, output, done) { const worker = new Worker(__filename, { workerData: { instance: i, - path, output, options: this._options }, @@ -88,13 +87,14 @@ Mapper.prototype.resultStream = async function (path, output, done) { await removePrefixes(output); ext = 'ttl'; } else if (format === 'ntriples') { - ext = 'n3'; + ext = 'nt'; } try { console.error('Merging final Linked Connections file...'); + // Join all resulting files into one - await exec(`for i in ${raws.map(r => { return `${r}.${ext}` }).join(" ")} ; do cat "$i" >> linkedConnections.${ext} && rm "$i" || break ; done`, { cwd: output }); + await exec(`for i in ${raws.map(r => { return `${r}.${ext}.gz` }).join(" ")} ; do zcat "$i" >> linkedConnections.${ext} && rm "$i" || break ; done`, { cwd: output }); let t1 = new Date(); console.error('linkedConnections.' + ext + ' File created in ' + (t1.getTime() - t0.getTime()) + ' ms'); await del( @@ -133,27 +133,34 @@ async function cleanUpSources(sources) { async function appendLineBreaks(output) { const files = (await readdir(output)).filter(raw => raw.startsWith('raw_')); + const promises = []; for (const [i, f] of files.entries()) { if (i < files.length - 1) { - await appendFile(`${output}/${f}`, '\n') + promises.push(exec(`echo "" | gzip >> ${f}`, { cwd: output })); } } + + await Promise.all(promises); } async function removePrefixes(output) { const files = (await readdir(output)).filter(raw => raw.startsWith('raw_')); + const promises = []; for (const [i, f] of files.entries()) { if (i > 0) { // TODO: find a not hard-coded way to remove prefixes - await exec(`sed -i 1,4d ${f}`, { cwd: output }); + promises.push(exec(`zcat ${f} | tail -n +4 | gzip > ${f}.temp && mv ${f}.temp ${f}`, { cwd: output })) } } + + await Promise.all(promises); } // Code executed only on a Worker Thread if (!isMainThread) { + let fmt = 'json'; // Read the connection rules file created in the master thread and build the Connection objects! - let connectionStream = fs.createReadStream(workerData['path'] + '/connections_' + workerData['instance'] + '.txt', { encoding: 'utf8', objectMode: true }) + let connectionStream = fs.createReadStream(workerData['output'] + '/connections_' + workerData['instance'] + '.txt', { encoding: 'utf8', objectMode: true }) .pipe(JSONLParser()) .pipe(new ConnectionsBuilder()) .on('error', function (e) { @@ -166,8 +173,7 @@ if (!isMainThread) { if (format === 'mongo') { connectionStream = connectionStream.pipe(new Connections2Mongo()); } - connectionStream = connectionStream.pipe(new JSONLStringer()) - .pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.json')); + connectionStream = connectionStream.pipe(new JSONLStringer()); } else if (['jsonld', 'mongold'].includes(format)) { let context = undefined; // Only include the context for the first instance @@ -194,35 +200,37 @@ if (!isMainThread) { connectionStream = connectionStream.pipe(new Connections2Mongo()); } // Pipe the objects to a file - connectionStream = connectionStream.pipe(new JSONLStringer()) - .pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.json')); + connectionStream = connectionStream.pipe(new JSONLStringer()); } else if (format === 'csv') { + fmt = 'csv'; // Only include the header on the first file let header = false; if (workerData['instance'] === 0) { header = true; } - connectionStream = connectionStream.pipe(new Connections2CSV(header)) - .pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.csv')); + connectionStream = connectionStream.pipe(new Connections2CSV(header)); } else if (format === 'turtle') { + fmt = 'ttl'; let prefixes = { lc: 'http://semweb.mmlab.be/ns/linkedconnections#', gtfs: 'http://vocab.gtfs.org/terms#', xsd: 'http://www.w3.org/2001/XMLSchema#' }; connectionStream = connectionStream.pipe(new Connections2Triples(workerData['options']['baseUris'])) - .pipe(new N3.StreamWriter({ prefixes: prefixes })) - .pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.ttl')); + .pipe(new N3.StreamWriter({ prefixes: prefixes })); } else if (format === 'ntriples') { + fmt = 'nt'; connectionStream = connectionStream.pipe(new Connections2Triples(workerData['options']['baseUris'])) - .pipe(new N3.StreamWriter({ format: 'N-Triples' })) - .pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.n3')); + .pipe(new N3.StreamWriter({ format: 'N-Triples' })); } connectionStream.on('finish', () => { parentPort.postMessage('done'); }); + connectionStream.pipe(zlib.createGzip()) + .pipe(fs.createWriteStream(`${workerData['output']}/raw_${workerData['instance']}.${fmt}.gz`)); + } module.exports = Mapper; diff --git a/lib/stoptimes/st2c.js b/lib/stoptimes/st2c.js index a991be5..8a475ac 100644 --- a/lib/stoptimes/st2c.js +++ b/lib/stoptimes/st2c.js @@ -25,7 +25,8 @@ if (stopTime['arrival_time'] === '' && stopTime['departure_time'] === '') { // Both arrival and departure time for this stop are empty, so Connection rule cannot be created. // This is valid GTFS but requires interpolation to set estimated stop times. - return done(new Error(`ERROR: Empty arrival and departure times found for trip ${stopTime['trip_id']} on stop ${stopTime['stop_id']}. Interpolation is required in a previous step to handle these cases.`)); + console.error(`WARNING: Empty arrival and departure times found for trip ${stopTime['trip_id']} on stop ${stopTime['stop_id']}. Interpolation is required in a previous step to handle these cases (not supported yet). This stop time will be skipped.`) + done(); } // Get related trip, route and service dates diff --git a/package-lock.json b/package-lock.json index a545053..ca5525e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "gtfs2lc", - "version": "2.1.5", + "version": "2.1.6", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "gtfs2lc", - "version": "2.1.5", + "version": "2.1.6", "license": "MIT", "dependencies": { "commander": "^4.1.1", diff --git a/package.json b/package.json index 1876a94..890ee81 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "gtfs2lc", - "version": "2.1.5", + "version": "2.1.6", "description": "Mapping script from gtfs to (linked) connections", "main": "lib/gtfs2lc.js", "bin": {