From 53d94a15c12dc4a3f7d6ae13152d3c3466668090 Mon Sep 17 00:00:00 2001 From: Arthur Bolsoni Date: Thu, 28 Mar 2024 16:25:18 -0300 Subject: [PATCH 1/2] feat: post_event implementation --- lib/wire/connection.js | 75 ++++++++++++++++++++++++ lib/wire/database.js | 36 +++++++++++- lib/wire/eventConnection.js | 113 ++++++++++++++++++++++++++++++++++++ lib/wire/fbEventManager.js | 98 +++++++++++++++++++++++++++++++ test/index.js | 71 ++++++++++++++++++++++ 5 files changed, 391 insertions(+), 2 deletions(-) create mode 100644 lib/wire/eventConnection.js create mode 100644 lib/wire/fbEventManager.js diff --git a/lib/wire/connection.js b/lib/wire/connection.js index c620817..88a86cf 100644 --- a/lib/wire/connection.js +++ b/lib/wire/connection.js @@ -1883,4 +1883,79 @@ function bufferReader(buffer, max, writer, cb, beg, end) { }); } +Connection.prototype.auxConnection = function (callback) { + var self = this; + if (self._isClosed) + return this.throwClosed(callback); + var msg = self._msg; + msg.pos = 0; + msg.addInt(Const.op_connect_request); + msg.addInt(1); // async + msg.addInt(self.dbhandle); + msg.addInt(0); + function cb(err, ret) { + + if (err) { + doError(err, callback); + return; + } + + var socket_info = { + family: ret.buffer.readInt16BE(0), + port: ret.buffer.readUInt16BE(2), + host: ret.buffer.readUInt8(4) + '.' + ret.buffer.readUInt8(5) + '.' + ret.buffer.readUInt8(6) + '.' + ret.buffer.readUInt8(7) + } + + callback(undefined, socket_info); + } + this._queueEvent(cb); +} + +Connection.prototype.queEvents = function (events, eventid, callback) { + var self = this; + if (this._isClosed) + return this.throwClosed(callback); + var msg = this._msg; + var blr = this._blr; + blr.pos = 0; + msg.pos = 0; + msg.addInt(Const.op_que_events); + msg.addInt(this.dbhandle); + // prepare EPB + blr.addByte(1) // epb_version + for (var event in events) { + var event_buffer = new Buffer(event, 'UTF8'); + blr.addByte(event_buffer.length); + blr.addBytes(event_buffer); + blr.addInt32(events[event]); + } + msg.addBlr(blr); // epb + msg.addInt(0); // ast + msg.addInt(0); // args + msg.addInt(eventid); + this._queueEvent(callback); +} + +Connection.prototype.closeEvents = function (eventid, callback) { + var self = this; + if (this._isClosed) + return this.throwClosed(callback); + var msg = self._msg; + msg.pos = 0; + msg.addInt(Const.op_cancel_events); + msg.addInt(self.dbhandle); + msg.addInt(eventid); + + function cb(err, ret) { + if (err) { + doError(err, callback); + return; + } + + callback(err); + } + + this._queueEvent(cb); +} + module.exports = Connection; diff --git a/lib/wire/database.js b/lib/wire/database.js index 230f26d..b6fb1a3 100644 --- a/lib/wire/database.js +++ b/lib/wire/database.js @@ -1,6 +1,8 @@ const Events = require('events'); -const {doError} = require('../callback'); -const {escape} = require('../utils'); +const { doError } = require('../callback'); +const { escape } = require('../utils'); +const EventConnection = require('./eventConnection'); +const FbEventManager = require('./fbEventManager'); /*************************************** * @@ -11,6 +13,7 @@ const {escape} = require('../utils'); function Database(connection) { this.connection = connection; connection.db = this; + this.eventid = 1; } Database.prototype.__proto__ = Object.create(Events.EventEmitter.prototype, { @@ -163,4 +166,33 @@ Database.prototype.drop = function(callback) { return this.connection.dropDatabase(callback); }; +Database.prototype.attachEvent = function (callback) { + var self = this; + this.connection.auxConnection(function (err, socket_info) { + + if (err) { + doError(err, callback); + return; + } + + const eventConnection = new EventConnection(self.connection.host, socket_info.port, function (err) { + if (err) { + doError(err, callback); + return; + } + + const evt = new FbEventManager(self, eventConnection, self.eventid++, function (err) { + if (err) { + doError(err, callback); + return; + } + + callback(err, evt); + }); + }, self); + }); + + return this; +} + module.exports = Database; diff --git a/lib/wire/eventConnection.js b/lib/wire/eventConnection.js new file mode 100644 index 0000000..7c370b2 --- /dev/null +++ b/lib/wire/eventConnection.js @@ -0,0 +1,113 @@ +const net = require('net'); +const { XdrReader } = require('./serialize'); +const DEFAULT_ENCODING = 'utf8'; +const Const = require('./const'); + +var EventConnection = function (host, port, callback, db) { + var self = this; + this.db = db; + this.emgr = null; + this._isClosed = false; + this._isOpened = false; + this._socket = net.createConnection(port, host); + this._bind_events(host, port, callback); + this.error; + this.eventcallback; +}; + +EventConnection.prototype._bind_events = function (host, port, callback) { + var self = this; + + self._socket.on('close', function () { + + self._isClosed = true; + }) + + self._socket.on('error', function (e) { + + self.error = e; + }) + + self._socket.on('connect', function () { + self._isClosed = false; + self._isOpened = true; + if (callback) + callback(); + }); + + self._socket.on('data', function (data) { + var xdr, buf; + + if (!self._xdr) { + xdr = new XdrReader(data); + } else { + xdr = self._xdr; + delete (self._xdr); + buf = Buffer.from(data.length + xdr.buffer.length); + xdr.buffer.copy(buf); + data.copy(buf, xdr.buffer.length); + xdr.buffer = buf; + } + + try { + var item, op; + var op_pos = xdr.pos; + var tmp_event; + while (xdr.pos < xdr.buffer.length) { + do { + var r = xdr.readInt(); + } while (r === Const.op_dummy); + + switch (r) { + case Const.op_event: + xdr.readInt(); // db handle + var buf = xdr.readArray(); + // first byte is always set to 1 + tmp_event = {}; + var lst_event = []; + var eventname = ''; + var eventcount = 0; + var pos = 1; + while (pos < buf.length) { + var len = buf.readInt8(pos++); + eventname = buf.toString(DEFAULT_ENCODING, pos, pos + len); + var prevcount = self.emgr.events[eventname] || 0; + pos += len; + eventcount = buf.readInt32LE(pos); + tmp_event[eventname] = eventcount; + pos += 4; + if (prevcount !== eventcount) + lst_event.push({ name: eventname, count: eventcount }); + } + xdr.readInt64(); // ignore AST INFO + var event_id = xdr.readInt(); + // set the new count in global event hash + for (var evt in tmp_event) { + self.emgr.events[evt] = tmp_event[evt]; + } + if (self.eventcallback) + return self.eventcallback(null, { eventid: event_id, events: lst_event }); + + default: + return cb(new Error('Unexpected:' + r)); + } + } + } catch (err) { + if (err instanceof RangeError) { // incomplete packet case + xdr.buffer = xdr.buffer = xdr.buffer.slice(op_pos); + xdr.pos = 0; + self._xdr = xdr; + } + } + }) +} + +EventConnection.prototype.throwClosed = function (callback) { + var err = new Error('Event Connection is closed.'); + this.db.emit('error', err); + if (callback) + callback(err); + return this; +}; + +module.exports = EventConnection; \ No newline at end of file diff --git a/lib/wire/fbEventManager.js b/lib/wire/fbEventManager.js new file mode 100644 index 0000000..207d3ee --- /dev/null +++ b/lib/wire/fbEventManager.js @@ -0,0 +1,98 @@ +const Events = require('events'); +const { doError } = require('../callback'); + + +function FbEventManager(db, eventconnection, eventid, callback) { + this.db = db; + this.eventconnection = eventconnection; + this.events = {}; + this.eventid = eventid; + this._createEventLoop(callback); +} + +FbEventManager.prototype.__proto__ = Object.create(Events.EventEmitter.prototype, { + constructor: { + value: FbEventManager, + enumberable: false + } +}); + +FbEventManager.prototype._createEventLoop = function (callback) { + var self = this; + var cnx = this.db.connection; + this.eventconnection.emgr = this; + // create the loop + function loop(first) { + cnx.queEvents(self.events, self.eventid, function (err, ret) { + if (err) { + doError(err, callback); + return; + } + if (first) + callback(); + }) + } + + this.eventconnection.eventcallback = function (err, ret) { + if (err || (self.eventid !== ret.eventid)) { + doError(err || new Error('Bad eventid'), callback); + return; + } + + ret.events.forEach(function (event) { + self.emit('post_event', event.name, event.count) + }) + + loop(false); + } + + loop(true); +} + +FbEventManager.prototype._changeEvent = function (callback) { + var self = this; + + self.db.connection.closeEvents(this.eventid, function (err) { + if (err) { + doError(err, callback); + return; + } + + self.db.connection.queEvents(self.events, self.eventid, callback); + }) +} + +FbEventManager.prototype.registerEvent = function (events, callback) { + var self = this; + + if (self.db.connection._isClosed || self.eventconnection._isClosed) + return self.eventconnection.throwClosed(callback); + + events.forEach((event) => self.events[event] = self.events[event] || 0); + self._changeEvent(callback); +} + +FbEventManager.prototype.unregisterEvent = function (events, callback) { + var self = this; + + if (self.db.connection._isClosed || self.eventconnection._isClosed) + return self.eventconnection.throwClosed(callback); + + events.forEach(function (event) { delete self.events[event] }); + self._changeEvent(callback); +} + +FbEventManager.prototype.close = function (callback) { + var self = this; + + self.db.connection.closeEvents(this.eventid, function (err) { + if (err) { + doError(err, callback); + return; + } + + self.eventconnection._socket.end(); + }); +} + +module.exports = FbEventManager; \ No newline at end of file diff --git a/test/index.js b/test/index.js index fa2b723..d310a44 100644 --- a/test/index.js +++ b/test/index.js @@ -48,6 +48,77 @@ describe('Connection', function () { }); }); +describe('Events', function () { + const table_sql = 'CREATE TABLE TEST_EVENTS (ID INT NOT NULL CONSTRAINT PK_EVENTS PRIMARY KEY, NAME VARCHAR(50))'; + + let db; + + before(function (done) { + Firebird.attachOrCreate(config, function (err, _db) { + if (err) throw err; + db = _db; + + db.query(table_sql, [], function (err) { + assert.ok(!err, err); + + db.query(`CREATE TRIGGER TRG_TEST_TRIGGER FOR TEST_EVENTS AFTER INSERT OR UPDATE AS BEGIN POST_EVENT('TRG_TEST_EVENTS'); END`, [], function (err) { + assert.ok(!err, err); + + db.query('SELECT RDB$TRIGGER_NAME FROM RDB$TRIGGERS WHERE RDB$TRIGGER_NAME = ?', ['TRG_TEST_TRIGGER'], function (err, rows) { + assert.ok(!err, err); + assert.ok(rows.length > 0); + done(); + }); + }); + }); + }); + }); + + after(function () { + if (db) { + db.detach(); + } + }); + + it("should create a connection", function (done) { + db.attachEvent((err, evtmgr) => { + assert.ok(!err, err); + done(); + }) + }); + + it("should register an event", function (done) { + db.attachEvent((err, evtmgr) => { + assert.ok(!err, err); + + evtmgr.registerEvent(["TRG_TEST_EVENTS"], (err) => { + assert.ok(!err, err); + done(); + }); + }) + }); + + it("should receive an event", function (done) { + db.attachEvent((err, evtmgr) => { + assert.ok(!err, err); + + evtmgr.registerEvent(["TRG_TEST_EVENTS"], (err) => { + assert.ok(!err, err); + + evtmgr.on('post_event', (name, count) => { + assert.equal(name, 'TRG_TEST_EVENTS'); + assert.equal(count, 1); + done(); + }); + + db.query('INSERT INTO TEST_EVENTS (ID, NAME) VALUES (?, ?)', [1, 'xpto'], (err) => { + assert.ok(!err, err); + }); + }); + }) + }); +}); + describe('Auth plugin connection', function () { // Must be test with firebird 2.5 or higher with Legacy_Auth enabled on server From a9c82e6bac2b0124f2924bea6591b1a727dcf5e3 Mon Sep 17 00:00:00 2001 From: Arthur Bolsoni Date: Thu, 28 Mar 2024 17:09:59 -0300 Subject: [PATCH 2/2] adicionado types --- lib/index.d.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/index.d.ts b/lib/index.d.ts index 974badd..6c43b03 100644 --- a/lib/index.d.ts +++ b/lib/index.d.ts @@ -39,6 +39,7 @@ declare module 'node-firebird' { sequentially(query: string, params: any[], rowCallback: SequentialCallback, callback: SimpleCallback, asArray?: boolean): Database; drop(callback: SimpleCallback): void; escape(value: any): string; + attachEvent(callback: any): this; } export interface Transaction {