-
Notifications
You must be signed in to change notification settings - Fork 476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support channel options like highwatermark #778
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,8 +28,13 @@ class CallbackModel extends EventEmitter { | |
this.connection._updateSecret(newSecret, reason, cb); | ||
} | ||
|
||
createChannel (cb) { | ||
createChannel (options, cb) { | ||
if (arguments.length === 1) { | ||
cb = options; | ||
options = undefined; | ||
} | ||
var ch = new Channel(this.connection); | ||
ch.setOptions(options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used a setter to specify the channel options to avoid making a breaking change to the Channel constructor |
||
ch.open(function (err, ok) { | ||
if (err === null) | ||
cb && cb(null, ch); | ||
|
@@ -39,8 +44,13 @@ class CallbackModel extends EventEmitter { | |
return ch; | ||
} | ||
|
||
createConfirmChannel (cb) { | ||
createConfirmChannel (options, cb) { | ||
if (arguments.length === 1) { | ||
cb = options; | ||
options = undefined; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maintain backwards compatibility if no options are specified |
||
var ch = new ConfirmChannel(this.connection); | ||
ch.setOptions(options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used a setter to specify the channel options to avoid making a breaking change to the Channel constructor |
||
ch.open(function (err) { | ||
if (err !== null) | ||
return cb && cb(err); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,10 +72,14 @@ suite('updateSecret', function() { | |
}); | ||
|
||
function channel_test_fn(method) { | ||
return function(name, chfun) { | ||
return function(name, options, chfun) { | ||
if (arguments.length === 2) { | ||
chfun = options; | ||
options = {}; | ||
} | ||
test(name, function(done) { | ||
connect(kCallback(function(c) { | ||
c[method](kCallback(function(ch) { | ||
c[method](options, kCallback(function(ch) { | ||
chfun(ch, done); | ||
}, done)); | ||
}, done)); | ||
|
@@ -210,6 +214,34 @@ suite('sending messages', function() { | |
}); | ||
}); | ||
|
||
channel_test('saturate buffer', function(ch, done) { | ||
var msg = randomString(); | ||
ch.assertQueue('', {exclusive: true}, function(e, q) { | ||
if (e !== null) return done(e); | ||
let ok; | ||
for (let i = 0; i < 2047; i++) { | ||
cressie176 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ok = ch.sendToQueue(q.queue, Buffer.from(msg)); | ||
if (!ok) break; | ||
} | ||
|
||
assert.equal(ok, false); | ||
done(); | ||
}); | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This "test" finds how much the local environment running the test can take before the buffer is saturated, then updates the channel options for the next test. It's more than a little hacky, but I couldn't think of a better way to get the test working reliably. |
||
|
||
channel_test('set high watermark (making it harder to saturate the buffer', { highWaterMark: 4092 }, function(ch, done) { | ||
var msg = randomString(); | ||
ch.assertQueue('', {exclusive: true}, function(e, q) { | ||
if (e !== null) return done(e); | ||
let ok; | ||
for (let i = 0; i < 4092; i++) { | ||
cressie176 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ok = ch.sendToQueue(q.queue, Buffer.from(msg)); | ||
assert.equal(ok, true); | ||
} | ||
done(); | ||
}); | ||
}); | ||
|
||
}); | ||
|
||
suite('ConfirmChannel', function() { | ||
|
@@ -228,6 +260,34 @@ suite('ConfirmChannel', function() { | |
ch.waitForConfirms(done); | ||
}); | ||
|
||
confirm_channel_test('saturate buffer', function(ch, done) { | ||
var msg = randomString(); | ||
ch.assertQueue('', {exclusive: true}, function(e, q) { | ||
if (e !== null) return done(e); | ||
let ok; | ||
for (let i = 0; i < 2047; i++) { | ||
ok = ch.sendToQueue(q.queue, Buffer.from(msg)); | ||
if (!ok) break; | ||
} | ||
|
||
assert.equal(ok, false); | ||
done(); | ||
}); | ||
}); | ||
|
||
confirm_channel_test('set high watermark (making it harder to saturate the buffer', { highWaterMark: 4092 }, function(ch, done) { | ||
var msg = randomString(); | ||
ch.assertQueue('', {exclusive: true}, function(e, q) { | ||
if (e !== null) return done(e); | ||
let ok; | ||
for (let i = 0; i < 4092; i++) { | ||
ok = ch.sendToQueue(q.queue, Buffer.from(msg)); | ||
assert.equal(ok, true); | ||
} | ||
done(); | ||
}); | ||
}); | ||
|
||
}); | ||
|
||
suite("Error handling", function() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maintain backwards compatibility if no options are specified