Skip to content

Commit

Permalink
perf: add ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Oct 22, 2024
1 parent 6b7cfcf commit f67a72e
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 14 deletions.
8 changes: 7 additions & 1 deletion lib/common/createData.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
const createData = (type, data) => ({ type, data });
const createMessage = (type, to, data) => ({ ...createData(type, data), to });

const createMessage = ({ type, to, message, messageId, ttl }) => ({
messageId,
...createData(type, message),
to,
ttl,
});
module.exports = { createData, createMessage };
2 changes: 1 addition & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Connection extends EventEmitter {
this._newConnection(connectionId);
}

_send(connectionId, message) {
_publish(connectionId, message) {
const socket = this.#connections.get(connectionId);
if (!socket) throw new Error(`Node does not exist ${connectionId}`);
socket.write(serialization(message));
Expand Down
5 changes: 3 additions & 2 deletions lib/constants.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const HANDSHAKE = 'HANDSHAKE';
const DM = 'MES SAGE';
const DM = 'MESSAGE';
const CHECK_INTERVAL = 500;
const TIMEOUT_DURATION = 20000;

const TTL = 300;
const TIMEOUT_ERROR_MESSAGE = `Neighbor check timed out after ${TIMEOUT_DURATION / 1000} seconds`;

module.exports = {
Expand All @@ -11,4 +11,5 @@ module.exports = {
CHECK_INTERVAL,
TIMEOUT_DURATION,
TIMEOUT_ERROR_MESSAGE,
TTL,
};
33 changes: 23 additions & 10 deletions lib/messaging.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
'use strict';
const { createData, uuid, createMessage } = require('./common');
const Connection = require('./connection');
const { createData, uuid, createMessage } = require('./common');
const {
HANDSHAKE,
DM,
CHECK_INTERVAL,
TIMEOUT_DURATION,
TIMEOUT_ERROR_MESSAGE,
TTL,
} = require('./constants');

class Messaging extends Connection {
NODE_ID = uuid();
neighbors = new Map();
seenMessages = new Set();

constructor(port, targetNode, nodeId) {
super(port, targetNode);
if (nodeId) this.NODE_ID = nodeId;
}

_newConnection(connectionId) {
this._send(connectionId, createData(HANDSHAKE, this.NODE_ID));
this._publish(connectionId, createData(HANDSHAKE, this.NODE_ID));
}

_onMessage(connectionId, messageObject) {
Expand All @@ -33,12 +35,21 @@ class Messaging extends Connection {
if (nodeId) this.neighbors.delete(connectionId);
}

async send(nodeId, message) {
async publish(nodeId, message) {
await this.#neighborCheck();
this.#broadcast(nodeId, message);
}

#broadcast(nodeId, message, param) {
const { messageId = uuid(), ttl = TTL } = param || {};
const connectionId = this.neighbors.get(nodeId);
const targets = connectionId ? [connectionId] : this.neighbors;
this.seenMessages.add(messageId);
targets.forEach(target =>
this._send(target, createMessage(DM, nodeId, message)),
this._publish(
target,
createMessage({ type: DM, to: nodeId, messageId, message, ttl }),
),
);
}

Expand All @@ -53,9 +64,9 @@ class Messaging extends Connection {
}

async #neighborCheck() {
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(Error(TIMEOUT_ERROR_MESSAGE)), TIMEOUT_DURATION),
);
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(Error(TIMEOUT_ERROR_MESSAGE)), TIMEOUT_DURATION);
});
const checkNeighbors = async () => {
while (this.neighbors.size === 0) {
await new Promise(resolve => setTimeout(resolve, CHECK_INTERVAL));
Expand All @@ -70,9 +81,11 @@ class Messaging extends Connection {
}

[DM](_, messageObject) {
const { data, to } = messageObject;
if (to !== this.NODE_ID) return this.send(to, data);
this.emit(DM, data);
const { messageId, data, to, ttl } = messageObject;
if (to === this.NODE_ID) return void this.emit(DM, data);
const isProcessed = this.seenMessages.has(messageId);
if (ttl < 1 || isProcessed) return;
this.#broadcast(to, data, { messageId, ttl: ttl - 1 });
}
}

Expand Down

0 comments on commit f67a72e

Please sign in to comment.