Skip to content

Commit

Permalink
Implement send_multipart() and recv_multipart()
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesWrigley committed Nov 29, 2024
1 parent 132f0c3 commit 30d1e32
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ CurrentModule = ZMQ
This documents notable changes in ZMQ.jl. The format is based on [Keep a
Changelog](https://keepachangelog.com).

## [v1.4.0] - 2024-11-30

### Added
- Implemented [`send_multipart()`](@ref) and [`recv_multipart()`](@ref) for
working with multipart messages ([#253]).

## [v1.3.0] - 2024-08-03

### Added
Expand Down
2 changes: 2 additions & 0 deletions docs/src/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ close
bind
connect
recv
recv_multipart
send
send_multipart
```

ZMQ socket types (note: some of these are aliases; e.g. `XREQ = DEALER`):
Expand Down
39 changes: 39 additions & 0 deletions src/comm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ function Sockets.send(f::Function, socket::Socket; more::Bool=false)
send(socket, take!(io); more=more)
end

"""
send_multipart(socket::Socket, parts)
Send a multipart message composed of the elements in `parts`. `parts` may be any
object that supports `getindex()`, `eachindex()`, and `lastindex()`.
"""
function send_multipart(socket::Socket, parts)
for i in eachindex(parts)
is_last = i == lastindex(parts)
send(socket, parts[i]; more=!is_last)
end
end

############################################################################

function _recv!(socket::Socket, zmsg)
Expand Down Expand Up @@ -107,3 +120,29 @@ function Sockets.recv(socket::Socket, ::Type{T}) where {T}
close(zmsg)
end
end

# Specialization so that recv(::Socket, ::Message) works
Sockets.recv(socket::Socket, ::Type{Message}) = recv(socket)

"""
recv_multipart(socket::Socket, ::Type{T}) -> Vector{T}
Receive a multipart message of a specific type `T`. This behaves in the same way
as [`recv(::Socket, ::Type)`](@ref).
"""
function recv_multipart(socket::Socket, ::Type{T}) where {T}
parts = T[recv(socket, T)]
while socket.rcvmore
push!(parts, recv(socket, T))
end

return parts
end

"""
recv_multipart(socket::Socket) -> Vector{Message}
Receive a multipart message as a sequence of zero-copy [`Message`](@ref)'s. See
[`recv(::Socket)`](@ref).
"""
recv_multipart(socket::Socket) = recv_multipart(socket, Message)
16 changes: 16 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@ end
finalize(m)
end

# Test multipart messages
data = ["foo", "bar", "baz"]
ZMQ.send_multipart(s2, data)

# Test receiving Message's
msgs = ZMQ.recv_multipart(s1)
@test msgs isa Vector{Message}
@test String.(msgs) == data

# Test receiving a specific type
data = Int[1, 2, 3]
ZMQ.send_multipart(s1, data)
msgs = ZMQ.recv_multipart(s2, Int)
@test msgs isa Vector{Int}
@test msgs == data

# ZMQ.close(s1); ZMQ.close(s2) # should happen when context is closed
ZMQ.close(ZMQ._context) # immediately close global context rather than waiting for exit
@test !isopen(s1)
Expand Down

0 comments on commit 30d1e32

Please sign in to comment.