Skip to content

Commit

Permalink
Merge pull request #12 from devit-tel/feature/add-sync-worker
Browse files Browse the repository at this point in the history
Added sync update worker
  • Loading branch information
NV4RE authored Sep 1, 2020
2 parents d33d858 + 2093585 commit a8990f6
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 4 deletions.
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"license": "Apache-2.0",
"dependencies": {
"@melonade/melonade-declaration": "^0.19.2",
"axios": "^0.20.0",
"node-rdkafka": "^2.9.1",
"ramda": "^0.26.1",
"tslib": "^2.0.1"
Expand Down
49 changes: 49 additions & 0 deletions src/example/syncWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { State, Task } from '@melonade/melonade-declaration';
import { TaskStates } from '..';
import { SyncWorker } from '../syncWorker';

const kafkaServers = process.env['MELONADE_KAFKA_SERVERS'];
const namespace = process.env['MELONADE_NAMESPACE'];
const processManagerUrl =
process.env['MELONADE_PROCESS_MANAGER_URL'] || 'http://localhost:8081';

const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms));

for (const forkID in new Array(1).fill(null)) {
for (const workerId of [1, 2, 3]) {
const worker = new SyncWorker(
// task name
`t${workerId}`,
// process task
async (task, updateTask) => {
await updateTask(task, { status: TaskStates.Inprogress });
console.log(`Processing ${task.taskName}`);
await sleep(5000);
await updateTask(task, { status: State.TaskStates.Completed });
},
// compensate task
async (task, updateTask) => {
await updateTask(task, { status: TaskStates.Inprogress });
console.log(`Compenstating ${task.taskName}`);
await sleep(10);
await updateTask(task, { status: TaskStates.Completed });
},
// configs
{
processManagerUrl,
kafkaServers,
namespace,
},
);

worker.once('ready', () => {
console.log(`Fork ${forkID} Worker t${workerId} is ready!`);
});

worker.on('task-timeout', (task: Task.ITask) => {
console.log(
`Worker skiped ${task.taskName}: ${task.taskId} because it already timed out`,
);
});
}
}
244 changes: 244 additions & 0 deletions src/syncWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
import { Event, Kafka, Task } from '@melonade/melonade-declaration';
import axios from 'axios';
import { EventEmitter } from 'events';
import {
ConsumerGlobalConfig,
KafkaConsumer,
LibrdKafkaError,
Message,
} from 'node-rdkafka';
import { jsonTryParse } from './utils/common';
import {
isTaskTimeout,
ITaskRef,
ITaskResponse,
mapTaskNameToTopic,
} from './worker';

export interface ISyncWorkerConfig {
processManagerUrl: string;
kafkaServers: string;
namespace?: string;
maximumPollingTasks?: number;
pollingCooldown?: number;
processTimeoutTask?: boolean;
autoStart?: boolean;
latencyCompensationMs?: number;
trackingRunningTasks?: boolean;
}

export interface ISyncUpdateTask {
(task: ITaskRef, result: ITaskResponse): Promise<void>;
}

const DEFAULT_WORKER_CONFIG = {
namespace: 'node',
maximumPollingTasks: 100,
pollingCooldown: 1,
processTimeoutTask: false,
autoStart: true,
latencyCompensationMs: 50,
trackingRunningTasks: false,
} as ISyncWorkerConfig;

// Maybe use kafka streamAPI
export class SyncWorker extends EventEmitter {
private consumer: KafkaConsumer;
workerConfig: ISyncWorkerConfig;
private isSubscribed: boolean = false;
private taskCallback: (
task: Task.ITask,
updateTask: ISyncUpdateTask,
isTimeout: boolean,
) => void | Promise<void>;
private compensateCallback: (
task: Task.ITask,
updateTask: ISyncUpdateTask,
isTimeout: boolean,
) => void | Promise<void>;
private runningTasks: {
[taskId: string]: Task.ITask;
} = {};
private tasksName: string | string[];

constructor(
tasksName: string | string[],
taskCallback: (
task: Task.ITask,
updateTask: ISyncUpdateTask,
isTimeout: boolean,
) => void | Promise<void>,
compensateCallback: (
task: Task.ITask,
updateTask: ISyncUpdateTask,
isTimeout: boolean,
) => void | Promise<void>,
workerConfig: ISyncWorkerConfig,
kafkaConfig: ConsumerGlobalConfig = {},
) {
super();

this.tasksName = tasksName;
this.taskCallback = taskCallback;
this.compensateCallback = compensateCallback;
this.workerConfig = {
...DEFAULT_WORKER_CONFIG,
...workerConfig,
};

this.consumer = new KafkaConsumer(
{
'bootstrap.servers': workerConfig.kafkaServers,
'group.id': `melonade-${this.workerConfig.namespace}.client`,
'enable.auto.commit': false,
...kafkaConfig,
},
{ 'auto.offset.reset': 'earliest' },
);

this.consumer.on('ready', () => {
this.emit('ready');

if (Array.isArray(tasksName)) {
this.consumer.subscribe(
tasksName.map((taskName: string) =>
mapTaskNameToTopic(taskName, this.workerConfig.namespace),
),
);
} else {
this.consumer.subscribe([
mapTaskNameToTopic(tasksName, this.workerConfig.namespace),
]);
}

if (this.workerConfig.autoStart) {
this.subscribe();
}
});
this.consumer.setDefaultConsumeTimeout(this.workerConfig.pollingCooldown);
this.consumer.connect();

process.once('SIGTERM', () => {
this.consumer.unsubscribe();
});
}

get health(): {
consumer: 'connected' | 'disconnected';
tasks: { [taskId: string]: Task.ITask };
} {
return {
consumer: this.consumer.isConnected() ? 'connected' : 'disconnected',
tasks: this.runningTasks,
};
}

consume = (
messageNumber: number = this.workerConfig.maximumPollingTasks,
): Promise<Task.ITask[]> => {
return new Promise((resolve: Function, reject: Function) => {
this.consumer.consume(
messageNumber,
(error: LibrdKafkaError, messages: Message[]) => {
if (error) {
setTimeout(() => reject(error), 1000);
} else {
resolve(
messages.map((message: Kafka.kafkaConsumerMessage) =>
jsonTryParse(message.value.toString(), undefined),
),
);
}
},
);
});
};

updateTask = async (task: ITaskRef, result: ITaskResponse) => {
await axios.post(
'v1/transaction/update',
{
transactionId: task.transactionId,
taskId: task.taskId,
status: result.status,
output: result.output,
logs: result.logs,
isSystem: false,
doNotRetry: result.doNotRetry,
} as Event.ITaskUpdate,
{
baseURL: this.workerConfig.processManagerUrl,
},
);
return;
};

commit = () => {
return this.consumer.commit();
};

private dispatchTask = async (task: Task.ITask, isTimeout: boolean) => {
switch (task.type) {
case Task.TaskTypes.Task:
return await this.taskCallback(task, this.updateTask, isTimeout);
case Task.TaskTypes.Compensate:
return await this.compensateCallback(task, this.updateTask, isTimeout);
default:
throw new Error(`Task type: "${task.type}" is invalid`);
}
};

private processTask = async (task: Task.ITask) => {
const isTimeout = isTaskTimeout(
task,
this.workerConfig.latencyCompensationMs,
);
if (isTimeout && this.workerConfig.processTimeoutTask === false) {
this.emit('task-timeout', task);
return;
}

if (this.workerConfig.trackingRunningTasks) {
this.runningTasks[task.taskId] = task;
}

try {
await this.dispatchTask(task, isTimeout);
} catch (error) {
console.warn(this.tasksName, error);
} finally {
if (this.workerConfig.trackingRunningTasks) {
delete this.runningTasks[task.taskId];
}
}
};

private poll = async () => {
// https://github.com/nodejs/node/issues/6673
while (this.isSubscribed) {
try {
const tasks = await this.consume();
if (tasks.length > 0) {
await Promise.all(tasks.map(this.processTask));
this.commit();
}
} catch (err) {
// In case of consume error
console.log(this.tasksName, err);
}
}

console.log(`Stop subscribed ${this.tasksName}`);
};

subscribe = () => {
if (!this.isSubscribed) {
this.isSubscribed = true;
this.poll();
}
};

unsubscribe = () => {
this.isSubscribed = false;
};
}
8 changes: 4 additions & 4 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ const DEFAULT_WORKER_CONFIG = {
trackingRunningTasks: false,
} as IWorkerConfig;

const alwaysCompleteFunction = (): ITaskResponse => ({
export const alwaysCompleteFunction = (): ITaskResponse => ({
status: State.TaskStates.Completed,
});

const mapTaskNameToTopic = (taskName: string, prefix: string) =>
export const mapTaskNameToTopic = (taskName: string, prefix: string) =>
`melonade.${prefix}.task.${taskName}`;

const isTaskTimeout = (
export const isTaskTimeout = (
task: Task.ITask,
latencyCompensationMs: number = 0,
): boolean => {
Expand All @@ -68,7 +68,7 @@ const isTaskTimeout = (
);
};

const validateTaskResult = (result: ITaskResponse): ITaskResponse => {
export const validateTaskResult = (result: ITaskResponse): ITaskResponse => {
const status = R.prop('status', result);
if (
![
Expand Down

0 comments on commit a8990f6

Please sign in to comment.