Skip to content

Commit

Permalink
Add dontwait flag thanks to @izludec
Browse files Browse the repository at this point in the history
  • Loading branch information
enwi committed Dec 29, 2022
1 parent e8cb8f1 commit 26157a6
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 10 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 1.0.0-dev.13

### Add dontwait to socket send functions
- Add `DONTWAIT` flag option to socket send functions (thanks @izludec)
- Add documentation on which socket methods might throw an exception


## 1.0.0-dev.12

### Add socket monitoring capability
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ socket.connect("tcp://localhost:5566");
Send message
```dart
socket.send([1, 2, 3, 4, 5]);
socket.sendString('My Message');
```

Receive `ZMessage`s
Expand All @@ -67,7 +68,7 @@ socket.payloads.listen((payload) {
});
```

Receie socket events
Receive socket events
```dart
final MonitoredZSocket socket = context.createMonitoredSocket(SocketType.req);
socket.events.listen((event) {
Expand Down
79 changes: 71 additions & 8 deletions lib/src/socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,21 @@ class ZSocket {
/// The [more] parameter (defaults to false) signals that this is a multi-part
/// message. ØMQ ensures atomic delivery of messages: peers shall receive
/// either all message parts of a message or none at all.
void send(final List<int> data, {final bool more = false}) {
///
/// The [nowait] parameter (defaults to false) specifies that the operation
/// should be performed in non-blocking mode. For socket types (DEALER, PUSH)
/// that block when there are no available peers (or all peers have full
/// high-water mark). If the message cannot be queued on the socket,
/// the zmq_send() function shall fail with errno set to EAGAIN.
///
/// Throws [ZeroMQException] on error
void send(final List<int> data,
{final bool more = false, final bool nowait = false}) {
_checkNotClosed();
final ptr = malloc.allocate<Uint8>(data.length);
ptr.asTypedList(data.length).setAll(0, data);

final sendParams = more ? ZMQ_SNDMORE : 0;
final sendParams = more ? ZMQ_SNDMORE : 0 | (nowait ? ZMQ_DONTWAIT : 0);
final result =
_bindings.zmq_send(_socket, ptr.cast(), data.length, sendParams);
malloc.free(ptr);
Expand All @@ -101,26 +110,63 @@ class ZSocket {
/// The [more] parameter (defaults to false) signals that this is a multi-part
/// message. ØMQ ensures atomic delivery of messages: peers shall receive
/// either all message parts of a message or none at all.
void sendString(final String string, {final bool more = false}) {
send(string.codeUnits, more: more);
///
/// The [nowait] parameter (defaults to false) specifies that the operation
/// should be performed in non-blocking mode. For socket types (DEALER, PUSH)
/// that block when there are no available peers (or all peers have full
/// high-water mark). If the message cannot be queued on the socket,
/// the zmq_send() function shall fail with errno set to EAGAIN.
///
/// Throws [ZeroMQException] on error
void sendString(final String string,
{final bool more = false, final bool nowait = false}) {
send(
string.codeUnits,
more: more,
nowait: nowait,
);
}

/// Sends the given [frame] over this socket
///
/// This is a convenience function and is the same as calling
/// [send(frame.payload, more: frame.hasMore)]
void sendFrame(final ZFrame frame) {
send(frame.payload, more: frame.hasMore);
///
/// The [nowait] parameter (defaults to false) specifies that the operation
/// should be performed in non-blocking mode. For socket types (DEALER, PUSH)
/// that block when there are no available peers (or all peers have full
/// high-water mark). If the message cannot be queued on the socket,
/// the zmq_send() function shall fail with errno set to EAGAIN.
///
/// Throws [ZeroMQException] on error
void sendFrame(final ZFrame frame, {final bool nowait = false}) {
send(
frame.payload,
more: frame.hasMore,
nowait: nowait,
);
}

/// Sends the given multi-part [message] over this socket
///
/// This is a convenience function.
/// Note that the individual [ZFrame.hasMore] are ignored
void sendMessage(final ZMessage message) {
///
/// The [nowait] parameter (defaults to false) specifies that the operation
/// should be performed in non-blocking mode. For socket types (DEALER, PUSH)
/// that block when there are no available peers (or all peers have full
/// high-water mark). If the message cannot be queued on the socket,
/// the zmq_send() function shall fail with errno set to EAGAIN.
///
/// Throws [ZeroMQException] on error
void sendMessage(final ZMessage message, {final bool nowait = false}) {
final lastIndex = message.length - 1;
for (int i = 0; i < message.length; ++i) {
send(message.elementAt(i).payload, more: i < lastIndex ? true : false);
send(
message.elementAt(i).payload,
more: i < lastIndex ? true : false,
nowait: nowait,
);
}
}

Expand All @@ -129,6 +175,8 @@ class ZSocket {
/// The [address] argument is a string consisting of two parts as follows: 'transport://address'.
/// The transport part specifies the underlying transport protocol to use.
/// The meaning of the address part is specific to the underlying transport protocol selected.
///
/// Throws [ZeroMQException] on error
void bind(final String address) {
_checkNotClosed();
final endpointPointer = address.toNativeUtf8();
Expand All @@ -142,6 +190,8 @@ class ZSocket {
/// The [address] argument is a string consisting of two parts as follows: 'transport://address'.
/// The transport part specifies the underlying transport protocol to use.
/// The meaning of the address part is specific to the underlying transport protocol selected.
///
/// Throws [ZeroMQException] on error
void connect(final String address) {
_checkNotClosed();
final endpointPointer = address.toNativeUtf8();
Expand All @@ -162,6 +212,8 @@ class ZSocket {
}

/// Set a socket [option] to a specific [value]
///
/// Throws [ZeroMQException] on error
void setOption(final int option, final String value) {
final ptr = value.toNativeUtf8();
final result = _bindings.zmq_setsockopt(
Expand All @@ -173,6 +225,8 @@ class ZSocket {
/// Sets the socket's long term secret key.
/// You must set this on both CURVE client and server sockets, see zmq_curve(7).
/// You can provide the [key] as a 40-character string encoded in the Z85 encoding format.
///
/// Throws [ZeroMQException] on error
void setCurveSecretKey(final String key) {
setOption(ZMQ_CURVE_SECRETKEY, key);
}
Expand All @@ -181,6 +235,8 @@ class ZSocket {
/// You must set this on CURVE client sockets, see zmq_curve(7).
/// You can provide the [key] as a 40-character string encoded in the Z85 encoding format.
/// The public key must always be used with the matching secret key.
///
/// Throws [ZeroMQException] on error
void setCurvePublicKey(final String key) {
setOption(ZMQ_CURVE_PUBLICKEY, key);
}
Expand All @@ -189,6 +245,8 @@ class ZSocket {
/// You must set this on CURVE client sockets, see zmq_curve(7).
/// You can provide the [key] as a 40-character string encoded in the Z85 encoding format.
/// This key must have been generated together with the server's secret key.
///
/// Throws [ZeroMQException] on error
void setCurveServerKey(final String key) {
setOption(ZMQ_CURVE_SERVERKEY, key);
}
Expand All @@ -201,6 +259,8 @@ class ZSocket {
/// non-empty [topic] shall subscribe to all messages beginning with the specified
/// prefix. Mutiple filters may be attached to a single [ZMQ_SUB] socket, in which case a
/// message shall be accepted if it matches at least one filter.
///
/// Throws [ZeroMQException] on error
void subscribe(final String topic) {
setOption(ZMQ_SUBSCRIBE, topic);
}
Expand All @@ -210,10 +270,13 @@ class ZSocket {
/// the [ZMQ_SUBSCRIBE] option. If the socket has several instances of the same filter
/// attached the [ZMQ_UNSUBSCRIBE] option shall remove only one instance, leaving the rest in
/// place and functional.
///
/// Throws [ZeroMQException] on error
void unsubscribe(final String topic) {
setOption(ZMQ_UNSUBSCRIBE, topic);
}

/// Throws a [StateError] when called and this socket is closed
void _checkNotClosed() {
if (_closed) {
throw StateError("This operation can't be performed on a closed socket!");
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: dartzmq
description: A simple dart zeromq implementation/wrapper around the libzmq C++ library
version: 1.0.0-dev.12
version: 1.0.0-dev.13
homepage: https://github.com/enwi/dartzmq
repository: https://github.com/enwi/dartzmq

Expand Down

0 comments on commit 26157a6

Please sign in to comment.