Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DeviceData and eventHandlers.js #14

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
.idea
.DS_Store
eventHandlers.js
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ While the server(s) and the service are designed to run continuously, the pool p
## Run
Run `node index.js --config=[CONFIG_FILE]`. See `[server|service|payout].sample.conf` for sample configuration files and clarifications.

## Custom Behavior
The pool server supports custom behaviors through the optional `eventHandlers.js` file. Code written here
will be run every time a specific event occurs. The currently supported events are:

* **onRegisterMessage** - Called on a *server* configuration when a register message is received, before
setting `PoolAgent` has processed it.
* **onRegistrationCompleted** - Called on a *server* configuration after a `PoolAgent` has successfully been registered.

See `eventHandlers.sample.js` for the template. To use, copy to a new file: `eventHandlers.js`, and add your
own logic inside the defined callbacks. The pool server will automatically include this file if it exists.

## License
Copyright 2018 The Nimiq Foundation

Expand Down
27 changes: 27 additions & 0 deletions eventHandlers.sample.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* NOTE: Don't modify this file! Copy this file to `evenHandlers.js` and it will
* automatically be included in the pool server!
*/

/**
* Fired when a REGISTER message is received, before being processed by the
* PoolAgent. Good time to perform validation or mutation of message data.
* @param {PoolAgent} agent - The Agent for the newly registered device.
* @param {Object} msg - The full register message. This is not a copy; any

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msg should definitely be a copy

* mutation will affect the data used to create the PoolAgent.
* @param {mysql.PoolConnection} connectionPool - A MySQL connection pool,
* logged in as 'pool_server'.
* @throws {Error} Should throw an Error with a message to send to the device
* if registration should not continue.
* @returns {void}
*/
module.exports.onRegisterMessage = function onRegisterMessage(agent, msg, connectionPool) { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this one be async as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I'd leave it synchronous since I was catching with try/catch but I suppose this will still work as async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, exceptions do work across async/await


/**
* Fired when a new PoolAgent is registered to the PoolServer.
* @param {PoolAgent} agent - The Agent for the newly registered device.
* @param {mysql.PoolConnection} connectionPool - A MySQL connection pool,
* logged in as 'pool_server'.
* @returns {void}
*/
module.exports.onRegistrationCompleted = async function onRegistrationCompleted(agent, connectionPool) { }
6 changes: 5 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const Nimiq = require('@nimiq/core');
const argv = require('minimist')(process.argv.slice(2));
const config = require('./src/Config.js')(argv.config);
const fs = require('fs');

const PoolServer = require('./src/PoolServer.js');
const PoolService = require('./src/PoolService.js');
Expand All @@ -24,6 +25,9 @@ if (config.poolPayout.enabled && (config.poolServer.enabled || config.poolServic
process.exit(1);
}

const eventHandlers = fs.existsSync('./eventHandlers.js')
? require('./eventHandlers.js') : undefined;

Nimiq.Log.instance.level = config.log.level;
for (const tag in config.log.tags) {
Nimiq.Log.instance.setLoggable(tag, config.log.tags[tag]);
Expand Down Expand Up @@ -77,7 +81,7 @@ for (const seedPeer of config.seedPeers) {
}

if (config.poolServer.enabled) {
const poolServer = new PoolServer($.consensus, config.pool, config.poolServer.port, config.poolServer.mySqlPsw, config.poolServer.mySqlHost, config.poolServer.sslKeyPath, config.poolServer.sslCertPath);
const poolServer = new PoolServer($.consensus, config.pool, config.poolServer.port, config.poolServer.mySqlPsw, config.poolServer.mySqlHost, config.poolServer.sslKeyPath, config.poolServer.sslCertPath, eventHandlers);

if (config.poolMetricsServer.enabled) {
$.metricsServer = new MetricsServer(config.poolServer.sslKeyPath, config.poolServer.sslCertPath, config.poolMetricsServer.port, config.poolMetricsServer.password);
Expand Down
40 changes: 40 additions & 0 deletions spec/PoolAgent.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,44 @@ describe('PoolAgent', () => {
done();
})().catch(done.fail);
});

it('fires eventHandler callbacks', (done) => {
(async () => {
const keyPair = Nimiq.KeyPair.generate();
const clientAddress = keyPair.publicKey.toAddress();
const consensus = await Nimiq.Consensus.volatileFull();

let beforeCalled = false;
const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', 'localhost', '', '', {
onRegisterMessage: (agent, msg, connectionPool) => {
expect(msg.deviceId).toEqual(111111111);
msg.deviceId = 123;
beforeCalled = true;
},
onRegistrationCompleted: async (agent, connectionPool) => {
expect(agent.deviceId).toEqual(123);
expect(agent.deviceData).toEqual({ deviceGroup: 'foobar' });
expect(beforeCalled).toEqual(true);
done();
}
});

await poolServer.start();
const poolAgent = new PoolAgent(poolServer, { close: () => {}, send: () => {} }, Nimiq.NetAddress.fromIP('1.2.3.4'));
spyOn(poolAgent, '_regenerateNonce').and.callFake(() => { poolAgent._nonce = 42 });

const registerMsg = {
message: 'register',
address: clientAddress.toUserFriendlyAddress(),
deviceId: 111111111,
deviceData: {
deviceGroup: 'foobar'
},
mode: 'smart',
genesisHash: Nimiq.BufferUtils.toBase64(Nimiq.GenesisConfig.GENESIS_HASH.serialize())
};

await poolAgent._onMessage(registerMsg);
})().catch(done.fail);
});
});
46 changes: 39 additions & 7 deletions src/PoolAgent.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PoolAgent extends Nimiq.Observable {
this._netAddress = netAddress;

/** @type {PoolAgent.Mode} */
this.mode = PoolAgent.Mode.UNREGISTERED;
this._mode = PoolAgent.Mode.UNREGISTERED;

/** @type {number} */
this._difficulty = this._pool.config.startDifficulty;
Expand Down Expand Up @@ -46,7 +46,7 @@ class PoolAgent extends Nimiq.Observable {
* @param {Nimiq.Hash} accountsHash
*/
async updateBlock(prevBlock, transactions, prunedAccounts, accountsHash) {
if (this.mode !== PoolAgent.Mode.NANO) return;
if (this._mode !== PoolAgent.Mode.NANO) return;
if (!prevBlock || !transactions || !prunedAccounts || !accountsHash) return;

this._currentBody = new Nimiq.BlockBody(this._pool.poolAddress, transactions, this._extraData, prunedAccounts);
Expand Down Expand Up @@ -104,9 +104,9 @@ class PoolAgent extends Nimiq.Observable {

switch (msg.message) {
case PoolAgent.MESSAGE_SHARE: {
if (this.mode === PoolAgent.Mode.NANO) {
if (this._mode === PoolAgent.Mode.NANO) {
await this._onNanoShareMessage(msg);
} else if (this.mode === PoolAgent.Mode.SMART) {
} else if (this._mode === PoolAgent.Mode.SMART) {
await this._onSmartShareMessage(msg);
}
this._sharesSinceReset++;
Expand All @@ -133,14 +133,22 @@ class PoolAgent extends Nimiq.Observable {
return;
}

try {
this._pool.eventHandlers.onRegisterMessage(this, msg, this._pool.connectionPool);
} catch (e) {
this._sendError(e.message);
return;
}

this._address = Nimiq.Address.fromUserFriendlyAddress(msg.address);
this._deviceId = msg.deviceId;
this._deviceData = msg.deviceData;
switch (msg.mode) {
case PoolAgent.MODE_SMART:
this.mode = PoolAgent.Mode.SMART;
this._mode = PoolAgent.Mode.SMART;
break;
case PoolAgent.MODE_NANO:
this.mode = PoolAgent.Mode.NANO;
this._mode = PoolAgent.Mode.NANO;
break;
default:
throw new Error('Client did not specify mode');
Expand All @@ -165,13 +173,14 @@ class PoolAgent extends Nimiq.Observable {
});

this._sendSettings();
if (this.mode === PoolAgent.Mode.NANO) {
if (this._mode === PoolAgent.Mode.NANO) {
this._pool.requestCurrentHead(this);
}
await this.sendBalance();
this._timers.resetInterval('send-balance', () => this.sendBalance(), 1000 * 60 * 5);
this._timers.resetInterval('send-keep-alive-ping', () => this._ws.ping(), 1000 * 10);

this._pool.eventHandlers.onRegistrationCompleted(this, this._pool.connectionPool);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing await?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Nimiq.Log.i(PoolAgent, `REGISTER ${this._address.toUserFriendlyAddress()}, current balance: ${await this._pool.getUserBalance(this._userId)}`);
}

Expand Down Expand Up @@ -449,14 +458,37 @@ class PoolAgent extends Nimiq.Observable {
_onClose() {
this._offAll();

this._registered = false;
this._timers.clearAll();
this._pool.removeAgent(this);
}

_onError() {
this._registered = false;
this._pool.removeAgent(this);
this._ws.close();
}


/** @type {object} */
get deviceData() {
return this._deviceData;
}

/** @type {number} */
get deviceId() {
return this._deviceId;
}

/** @type {PoolAgent.Mode} */
get mode() {
return this._mode;
}

/** @type {boolean} */
get isRegistered() {
return this._registered;
}
}
PoolAgent.MESSAGE_REGISTER = 'register';
PoolAgent.MESSAGE_REGISTERED = 'registered';
Expand Down
15 changes: 14 additions & 1 deletion src/PoolServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ const fs = require('fs');
const PoolAgent = require('./PoolAgent.js');
const Helper = require('./Helper.js');

/**
* @typedef EventHandlers
* @property {function} onRegisterMessage
* @property {function} onRegistrationCompleted
*/

class PoolServer extends Nimiq.Observable {
/**
* @param {Nimiq.FullConsensus} consensus
Expand All @@ -16,8 +22,9 @@ class PoolServer extends Nimiq.Observable {
* @param {string} mySqlHost
* @param {string} sslKeyPath
* @param {string} sslCertPath
* @param {EventHandlers} [eventHandlers]
*/
constructor(consensus, config, port, mySqlPsw, mySqlHost, sslKeyPath, sslCertPath) {
constructor(consensus, config, port, mySqlPsw, mySqlHost, sslKeyPath, sslCertPath, eventHandlers) {
super();

/** @type {Nimiq.FullConsensus} */
Expand All @@ -29,6 +36,12 @@ class PoolServer extends Nimiq.Observable {
/** @type {Nimiq.Address} */
this.poolAddress = Nimiq.Address.fromUserFriendlyAddress(config.address);

/** @type {EventHandlers} */
this.eventHandlers = Object.assign({
onRegisterMessage: () => { },
onRegistrationCompleted: async () => { }
}, eventHandlers);

/** @type {PoolConfig} */
this._config = config;

Expand Down