Skip to content

Commit

Permalink
Merge pull request #139 from linkedconnections/development
Browse files Browse the repository at this point in the history
v2.1.6
  • Loading branch information
julianrojas87 authored Nov 14, 2022
2 parents ecd0d22 + d157640 commit 8c834ff
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 21 deletions.
42 changes: 25 additions & 17 deletions lib/gtfs2connections.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const fs = require('fs');
const zlib = require('zlib');
const path = require('path');
const ChildProcess = require('child_process');
const del = require('del');
Expand All @@ -17,7 +18,6 @@ const Connections2Triples = require('./Connections2Triples');


const readdir = util.promisify(fs.readdir);
const appendFile = util.promisify(fs.appendFile);
const exec = util.promisify(ChildProcess.exec);

const numCPUs = require('os').cpus().length;
Expand Down Expand Up @@ -61,7 +61,6 @@ Mapper.prototype.resultStream = async function (path, output, done) {
const worker = new Worker(__filename, {
workerData: {
instance: i,
path,
output,
options: this._options
},
Expand All @@ -88,13 +87,14 @@ Mapper.prototype.resultStream = async function (path, output, done) {
await removePrefixes(output);
ext = 'ttl';
} else if (format === 'ntriples') {
ext = 'n3';
ext = 'nt';
}

try {
console.error('Merging final Linked Connections file...');

// Join all resulting files into one
await exec(`for i in ${raws.map(r => { return `${r}.${ext}` }).join(" ")} ; do cat "$i" >> linkedConnections.${ext} && rm "$i" || break ; done`, { cwd: output });
await exec(`for i in ${raws.map(r => { return `${r}.${ext}.gz` }).join(" ")} ; do zcat "$i" >> linkedConnections.${ext} && rm "$i" || break ; done`, { cwd: output });
let t1 = new Date();
console.error('linkedConnections.' + ext + ' File created in ' + (t1.getTime() - t0.getTime()) + ' ms');
await del(
Expand Down Expand Up @@ -133,27 +133,34 @@ async function cleanUpSources(sources) {

async function appendLineBreaks(output) {
const files = (await readdir(output)).filter(raw => raw.startsWith('raw_'));
const promises = [];
for (const [i, f] of files.entries()) {
if (i < files.length - 1) {
await appendFile(`${output}/${f}`, '\n')
promises.push(exec(`echo "" | gzip >> ${f}`, { cwd: output }));
}
}

await Promise.all(promises);
}

async function removePrefixes(output) {
const files = (await readdir(output)).filter(raw => raw.startsWith('raw_'));
const promises = [];
for (const [i, f] of files.entries()) {
if (i > 0) {
// TODO: find a not hard-coded way to remove prefixes
await exec(`sed -i 1,4d ${f}`, { cwd: output });
promises.push(exec(`zcat ${f} | tail -n +4 | gzip > ${f}.temp && mv ${f}.temp ${f}`, { cwd: output }))
}
}

await Promise.all(promises);
}

// Code executed only on a Worker Thread
if (!isMainThread) {
let fmt = 'json';
// Read the connection rules file created in the master thread and build the Connection objects!
let connectionStream = fs.createReadStream(workerData['path'] + '/connections_' + workerData['instance'] + '.txt', { encoding: 'utf8', objectMode: true })
let connectionStream = fs.createReadStream(workerData['output'] + '/connections_' + workerData['instance'] + '.txt', { encoding: 'utf8', objectMode: true })
.pipe(JSONLParser())
.pipe(new ConnectionsBuilder())
.on('error', function (e) {
Expand All @@ -166,8 +173,7 @@ if (!isMainThread) {
if (format === 'mongo') {
connectionStream = connectionStream.pipe(new Connections2Mongo());
}
connectionStream = connectionStream.pipe(new JSONLStringer())
.pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.json'));
connectionStream = connectionStream.pipe(new JSONLStringer());
} else if (['jsonld', 'mongold'].includes(format)) {
let context = undefined;
// Only include the context for the first instance
Expand All @@ -194,35 +200,37 @@ if (!isMainThread) {
connectionStream = connectionStream.pipe(new Connections2Mongo());
}
// Pipe the objects to a file
connectionStream = connectionStream.pipe(new JSONLStringer())
.pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.json'));
connectionStream = connectionStream.pipe(new JSONLStringer());
} else if (format === 'csv') {
fmt = 'csv';
// Only include the header on the first file
let header = false;
if (workerData['instance'] === 0) {
header = true;
}
connectionStream = connectionStream.pipe(new Connections2CSV(header))
.pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.csv'));
connectionStream = connectionStream.pipe(new Connections2CSV(header));
} else if (format === 'turtle') {
fmt = 'ttl';
let prefixes = {
lc: 'http://semweb.mmlab.be/ns/linkedconnections#',
gtfs: 'http://vocab.gtfs.org/terms#',
xsd: 'http://www.w3.org/2001/XMLSchema#'
};
connectionStream = connectionStream.pipe(new Connections2Triples(workerData['options']['baseUris']))
.pipe(new N3.StreamWriter({ prefixes: prefixes }))
.pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.ttl'));
.pipe(new N3.StreamWriter({ prefixes: prefixes }));
} else if (format === 'ntriples') {
fmt = 'nt';
connectionStream = connectionStream.pipe(new Connections2Triples(workerData['options']['baseUris']))
.pipe(new N3.StreamWriter({ format: 'N-Triples' }))
.pipe(fs.createWriteStream(workerData['output'] + '/raw_' + workerData['instance'] + '.n3'));
.pipe(new N3.StreamWriter({ format: 'N-Triples' }));
}

connectionStream.on('finish', () => {
parentPort.postMessage('done');
});

connectionStream.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(`${workerData['output']}/raw_${workerData['instance']}.${fmt}.gz`));

}

module.exports = Mapper;
3 changes: 2 additions & 1 deletion lib/stoptimes/st2c.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
if (stopTime['arrival_time'] === '' && stopTime['departure_time'] === '') {
// Both arrival and departure time for this stop are empty, so Connection rule cannot be created.
// This is valid GTFS but requires interpolation to set estimated stop times.
return done(new Error(`ERROR: Empty arrival and departure times found for trip ${stopTime['trip_id']} on stop ${stopTime['stop_id']}. Interpolation is required in a previous step to handle these cases.`));
console.error(`WARNING: Empty arrival and departure times found for trip ${stopTime['trip_id']} on stop ${stopTime['stop_id']}. Interpolation is required in a previous step to handle these cases (not supported yet). This stop time will be skipped.`)
done();
}

// Get related trip, route and service dates
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "gtfs2lc",
"version": "2.1.5",
"version": "2.1.6",
"description": "Mapping script from gtfs to (linked) connections",
"main": "lib/gtfs2lc.js",
"bin": {
Expand Down

0 comments on commit 8c834ff

Please sign in to comment.