forked from screwdriver-cd/queue-worker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
112 lines (100 loc) · 4.06 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
'use strict';
const NR = require('node-resque');
const jobs = require('./lib/jobs');
const request = require('request');
const winston = require('winston');
const { connectionDetails, queuePrefix } = require('./config/redis');
/**
* Update build status to FAILURE
* @method updateBuildStatus
* @param {Object} updateConfig build config of the job
* @param {string} updateConfig.failure failure message
* @param {Object} updateConfig.job job info
* @param {Object} updateConfig.queue queue of the job
* @param {integer} updateConfig.workerId id of the workerId
* @param {Function} [callback] Callback function
* @return {Object} err Callback with err object
*/
function updateBuildStatus(updateConfig, callback) {
const { failure, job, queue, workerId } = updateConfig;
const { apiUri, buildId, token } = updateConfig.job.args[0];
return request({
json: true,
method: 'PUT',
uri: `${apiUri}/v4/builds/${buildId}`,
payload: {
status: 'FAILURE'
},
auth: {
bearer: token
}
}, (err, response) => {
if (!err && response.statusCode === 200) {
// eslint-disable-next-line max-len
winston.error(`worker[${workerId}] ${job} failure ${queue} ${JSON.stringify(job)} >> successfully update build status: ${failure}`);
callback(null);
} else {
// eslint-disable-next-line max-len
winston.error(`worker[${workerId}] ${job} failure ${queue} ${JSON.stringify(job)} >> ${failure} ${err} ${response}`);
callback(err);
}
});
}
/**
* Shutdown workers and then exit the process
* @method shutDownWorker
* @param {Object} worker worker to be ended
*/
function shutDownWorker(worker) {
worker.end((err) => {
if (err) {
winston.error(`failed to end the worker: ${err}`);
process.exit(128);
}
process.exit(0);
});
}
const supportFunction = { updateBuildStatus, shutDownWorker };
/* eslint-disable new-cap, max-len */
const multiWorker = new NR.multiWorker({
connection: connectionDetails,
queues: [`${queuePrefix}builds`],
minTaskProcessors: 1,
maxTaskProcessors: 10,
checkTimeout: 1000,
maxEventLoopDelay: 10,
toDisconnectProcessors: true
}, jobs);
multiWorker.on('start', workerId =>
winston.info(`worker[${workerId}] started`));
multiWorker.on('end', workerId =>
winston.info(`worker[${workerId}] ended`));
multiWorker.on('cleaning_worker', (workerId, worker, pid) =>
winston.info(`cleaning old worker ${worker} pid ${pid}`));
multiWorker.on('poll', (workerId, queue) =>
winston.info(`worker[${workerId}] polling ${queue}`));
multiWorker.on('job', (workerId, queue, job) =>
winston.info(`worker[${workerId}] working job ${queue} ${JSON.stringify(job)}`));
multiWorker.on('reEnqueue', (workerId, queue, job, plugin) =>
winston.info(`worker[${workerId}] reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`));
multiWorker.on('success', (workerId, queue, job, result) =>
winston.info(`worker[${workerId}] ${job} success ${queue} ${JSON.stringify(job)} >> ${result}`));
multiWorker.on('failure', (workerId, queue, job, failure) =>
supportFunction.updateBuildStatus({ workerId, queue, job, failure }, () => {}));
multiWorker.on('error', (workerId, queue, job, error) =>
winston.error(`worker[${workerId}] error ${queue} ${JSON.stringify(job)} >> ${error}`));
multiWorker.on('pause', workerId =>
winston.info(`worker[${workerId}] paused`));
// multiWorker emitters
multiWorker.on('internalError', error =>
winston.error(error));
multiWorker.on('multiWorkerAction', (verb, delay) =>
winston.info(`*** checked for worker status: ${verb} (event loop delay: ${delay}ms)`));
multiWorker.start();
// Shut down workers before exit the process
process.on('SIGTERM', () => supportFunction.shutDownWorker(multiWorker));
module.exports = {
jobs,
multiWorker,
supportFunction
};