diff --git a/lib/callback_model.js b/lib/callback_model.js index 45b5de0f..084244c8 100644 --- a/lib/callback_model.js +++ b/lib/callback_model.js @@ -28,8 +28,13 @@ class CallbackModel extends EventEmitter { this.connection._updateSecret(newSecret, reason, cb); } - createChannel (cb) { + createChannel (options, cb) { + if (arguments.length === 1) { + cb = options; + options = undefined; + } var ch = new Channel(this.connection); + ch.setOptions(options); ch.open(function (err, ok) { if (err === null) cb && cb(null, ch); @@ -39,8 +44,13 @@ class CallbackModel extends EventEmitter { return ch; } - createConfirmChannel (cb) { + createConfirmChannel (options, cb) { + if (arguments.length === 1) { + cb = options; + options = undefined; + } var ch = new ConfirmChannel(this.connection); + ch.setOptions(options); ch.open(function (err) { if (err !== null) return cb && cb(err); diff --git a/lib/channel.js b/lib/channel.js index 1db2af25..0feadca9 100644 --- a/lib/channel.js +++ b/lib/channel.js @@ -47,8 +47,12 @@ class Channel extends EventEmitter { this.handleMessage = acceptDeliveryOrReturn; } + setOptions(options) { + this.options = options; + } + allocate () { - this.ch = this.connection.freshChannel(this); + this.ch = this.connection.freshChannel(this, this.options); return this; } diff --git a/lib/channel_model.js b/lib/channel_model.js index f95b0192..f3cfffd3 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -30,14 +30,16 @@ class ChannelModel extends EventEmitter { return promisify(this.connection._updateSecret.bind(this.connection))(newSecret, reason); } - async createChannel() { + async createChannel(options) { const channel = new Channel(this.connection); + channel.setOptions(options); await channel.open(); return channel; } - async createConfirmChannel() { + async createConfirmChannel(options) { const channel = new ConfirmChannel(this.connection); + channel.setOptions(options); await channel.open(); await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk); return channel; diff --git a/test/callback_api.js b/test/callback_api.js index 29a9f409..51dd662d 100644 --- a/test/callback_api.js +++ b/test/callback_api.js @@ -72,10 +72,14 @@ suite('updateSecret', function() { }); function channel_test_fn(method) { - return function(name, chfun) { + return function(name, options, chfun) { + if (arguments.length === 2) { + chfun = options; + options = {}; + } test(name, function(done) { connect(kCallback(function(c) { - c[method](kCallback(function(ch) { + c[method](options, kCallback(function(ch) { chfun(ch, done); }, done)); }, done)); @@ -210,6 +214,33 @@ suite('sending messages', function() { }); }); + var channelOptions = {}; + + channel_test('find high watermark', function(ch, done) { + var msg = randomString(); + var baseline = 0; + ch.assertQueue('', {exclusive: true}, function(e, q) { + if (e !== null) return done(e); + while (ch.sendToQueue(q.queue, Buffer.from(msg))) { + baseline++; + }; + channelOptions.highWaterMark = baseline * 2; + done(); + }) + }); + + channel_test('set high watermark', channelOptions, function(ch, done) { + var msg = randomString(); + ch.assertQueue('', {exclusive: true}, function(e, q) { + if (e !== null) return done(e); + var ok; + for (var i = 0; i < channelOptions.highWaterMark; i++) { + ok = ch.sendToQueue(q.queue, Buffer.from(msg)); + assert.equal(ok, true); + } + done(); + }); + }); }); suite('ConfirmChannel', function() { @@ -228,6 +259,34 @@ suite('ConfirmChannel', function() { ch.waitForConfirms(done); }); + var channelOptions = {}; + + confirm_channel_test('find high watermark', function(ch, done) { + var msg = randomString(); + var baseline = 0; + ch.assertQueue('', {exclusive: true}, function(e, q) { + if (e !== null) return done(e); + while (ch.sendToQueue(q.queue, Buffer.from(msg))) { + baseline++; + }; + channelOptions.highWaterMark = baseline * 2; + done(); + }) + }); + + confirm_channel_test('set high watermark', channelOptions, function(ch, done) { + var msg = randomString(); + ch.assertQueue('', {exclusive: true}, function(e, q) { + if (e !== null) return done(e); + var ok; + for (var i = 0; i < channelOptions.highWaterMark; i++) { + ok = ch.sendToQueue(q.queue, Buffer.from(msg)); + assert.equal(ok, true); + } + done(); + }); + }); + }); suite("Error handling", function() {