Skip to content

Commit

Permalink
Merge pull request #253 from JuliaInterop/multipart
Browse files Browse the repository at this point in the history
Implement send_multipart() and recv_multipart()
  • Loading branch information
JamesWrigley authored Nov 30, 2024
2 parents 66cb119 + ab5a73f commit 9e8a41b
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "ZMQ"
uuid = "c2297ded-f4af-51ae-bb23-16f91089e4e1"
version = "1.3.0"
version = "1.4.0"

[deps]
FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee"
Expand Down
6 changes: 6 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ CurrentModule = ZMQ
This documents notable changes in ZMQ.jl. The format is based on [Keep a
Changelog](https://keepachangelog.com).

## [v1.4.0] - 2024-11-30

### Added
- Implemented [`send_multipart()`](@ref) and [`recv_multipart()`](@ref) for
working with multipart messages ([#253]).

## [v1.3.0] - 2024-08-03

### Added
Expand Down
2 changes: 2 additions & 0 deletions docs/src/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ close
bind
connect
recv
recv_multipart
send
send_multipart
```

ZMQ socket types (note: some of these are aliases; e.g. `XREQ = DEALER`):
Expand Down
39 changes: 39 additions & 0 deletions src/comm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ function Sockets.send(f::Function, socket::Socket; more::Bool=false)
send(socket, take!(io); more=more)
end

"""
send_multipart(socket::Socket, parts)
Send a multipart message composed of the elements in `parts`. `parts` may be any
object that supports `getindex()`, `eachindex()`, and `lastindex()`.
"""
function send_multipart(socket::Socket, parts)
for i in eachindex(parts)
is_last = i == lastindex(parts)
send(socket, parts[i]; more=!is_last)
end
end

############################################################################

function _recv!(socket::Socket, zmsg)
Expand Down Expand Up @@ -107,3 +120,29 @@ function Sockets.recv(socket::Socket, ::Type{T}) where {T}
close(zmsg)
end
end

# Specialization so that recv(::Socket, ::Message) works
Sockets.recv(socket::Socket, ::Type{Message}) = recv(socket)

"""
recv_multipart(socket::Socket, ::Type{T}) -> Vector{T}
Receive a multipart message of a specific type `T`. This behaves in the same way
as [`recv(::Socket, ::Type)`](@ref).
"""
function recv_multipart(socket::Socket, ::Type{T}) where {T}
parts = T[recv(socket, T)]
while socket.rcvmore
push!(parts, recv(socket, T))
end

return parts
end

"""
recv_multipart(socket::Socket) -> Vector{Message}
Receive a multipart message as a sequence of zero-copy [`Message`](@ref)'s. See
[`recv(::Socket)`](@ref).
"""
recv_multipart(socket::Socket) = recv_multipart(socket, Message)
42 changes: 29 additions & 13 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@ end
finalize(m)
end

# Test multipart messages
data = ["foo", "bar", "baz"]
ZMQ.send_multipart(s2, data)

# Test receiving Message's
msgs = ZMQ.recv_multipart(s1)
@test msgs isa Vector{Message}
@test String.(msgs) == data

# Test receiving a specific type
data = Int[1, 2, 3]
ZMQ.send_multipart(s1, data)
msgs = ZMQ.recv_multipart(s2, Int)
@test msgs isa Vector{Int}
@test msgs == data

# ZMQ.close(s1); ZMQ.close(s2) # should happen when context is closed
ZMQ.close(ZMQ._context) # immediately close global context rather than waiting for exit
@test !isopen(s1)
Expand Down Expand Up @@ -223,29 +239,29 @@ end
@testset "ZMQ resource management" begin
local leaked_req_socket, leaked_rep_socket
ZMQ.Socket(ZMQ.REQ) do req_socket
leaked_req_socket = req_socket
leaked_req_socket = req_socket

ZMQ.Socket(ZMQ.REP) do rep_socket
leaked_rep_socket = rep_socket
ZMQ.Socket(ZMQ.REP) do rep_socket
leaked_rep_socket = rep_socket

ZMQ.bind(rep_socket, "inproc://tester")
ZMQ.connect(req_socket, "inproc://tester")
ZMQ.bind(rep_socket, "inproc://tester")
ZMQ.connect(req_socket, "inproc://tester")

ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.")
@test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you."
ZMQ.send(rep_socket, "Coming, Mr. Bell.")
@test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell."
end
ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.")
@test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you."
ZMQ.send(rep_socket, "Coming, Mr. Bell.")
@test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell."
end

@test !ZMQ.isopen(leaked_rep_socket)
@test !ZMQ.isopen(leaked_rep_socket)
end
@test !ZMQ.isopen(leaked_req_socket)

local leaked_ctx
ZMQ.Context() do ctx
leaked_ctx = ctx
leaked_ctx = ctx

@test isopen(ctx)
@test isopen(ctx)
end
@test !isopen(leaked_ctx)
end
Expand Down

2 comments on commit 9e8a41b

@JamesWrigley
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/120452

Tip: Release Notes

Did you know you can add release notes too? Just add markdown formatted text underneath the comment after the text
"Release notes:" and it will be added to the registry PR, and if TagBot is installed it will also be added to the
release that TagBot creates. i.e.

@JuliaRegistrator register

Release notes:

## Breaking changes

- blah

To add them here just re-invoke and the PR will be updated.

Tagging

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v1.4.0 -m "<description of version>" 9e8a41b27b12c943b1bf07c191d9b7862a2b9457
git push origin v1.4.0

Please sign in to comment.