diff --git a/Project.toml b/Project.toml index 0ddb788..6c214fb 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "ZMQ" uuid = "c2297ded-f4af-51ae-bb23-16f91089e4e1" -version = "1.3.0" +version = "1.4.0" [deps] FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index d2f5991..aee7adb 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -7,6 +7,12 @@ CurrentModule = ZMQ This documents notable changes in ZMQ.jl. The format is based on [Keep a Changelog](https://keepachangelog.com). +## [v1.4.0] - 2024-11-30 + +### Added +- Implemented [`send_multipart()`](@ref) and [`recv_multipart()`](@ref) for + working with multipart messages ([#253]). + ## [v1.3.0] - 2024-08-03 ### Added diff --git a/docs/src/reference.md b/docs/src/reference.md index 89e2fc2..675fce7 100644 --- a/docs/src/reference.md +++ b/docs/src/reference.md @@ -27,7 +27,9 @@ close bind connect recv +recv_multipart send +send_multipart ``` ZMQ socket types (note: some of these are aliases; e.g. `XREQ = DEALER`): diff --git a/src/comm.jl b/src/comm.jl index f5676df..0c523d5 100644 --- a/src/comm.jl +++ b/src/comm.jl @@ -60,6 +60,19 @@ function Sockets.send(f::Function, socket::Socket; more::Bool=false) send(socket, take!(io); more=more) end +""" + send_multipart(socket::Socket, parts) + +Send a multipart message composed of the elements in `parts`. `parts` may be any +object that supports `getindex()`, `eachindex()`, and `lastindex()`. +""" +function send_multipart(socket::Socket, parts) + for i in eachindex(parts) + is_last = i == lastindex(parts) + send(socket, parts[i]; more=!is_last) + end +end + ############################################################################ function _recv!(socket::Socket, zmsg) @@ -107,3 +120,29 @@ function Sockets.recv(socket::Socket, ::Type{T}) where {T} close(zmsg) end end + +# Specialization so that recv(::Socket, ::Message) works +Sockets.recv(socket::Socket, ::Type{Message}) = recv(socket) + +""" + recv_multipart(socket::Socket, ::Type{T}) -> Vector{T} + +Receive a multipart message of a specific type `T`. This behaves in the same way +as [`recv(::Socket, ::Type)`](@ref). +""" +function recv_multipart(socket::Socket, ::Type{T}) where {T} + parts = T[recv(socket, T)] + while socket.rcvmore + push!(parts, recv(socket, T)) + end + + return parts +end + +""" + recv_multipart(socket::Socket) -> Vector{Message} + +Receive a multipart message as a sequence of zero-copy [`Message`](@ref)'s. See +[`recv(::Socket)`](@ref). +""" +recv_multipart(socket::Socket) = recv_multipart(socket, Message) diff --git a/test/runtests.jl b/test/runtests.jl index 1986f37..59d0b1e 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -137,6 +137,22 @@ end finalize(m) end + # Test multipart messages + data = ["foo", "bar", "baz"] + ZMQ.send_multipart(s2, data) + + # Test receiving Message's + msgs = ZMQ.recv_multipart(s1) + @test msgs isa Vector{Message} + @test String.(msgs) == data + + # Test receiving a specific type + data = Int[1, 2, 3] + ZMQ.send_multipart(s1, data) + msgs = ZMQ.recv_multipart(s2, Int) + @test msgs isa Vector{Int} + @test msgs == data + # ZMQ.close(s1); ZMQ.close(s2) # should happen when context is closed ZMQ.close(ZMQ._context) # immediately close global context rather than waiting for exit @test !isopen(s1) @@ -223,29 +239,29 @@ end @testset "ZMQ resource management" begin local leaked_req_socket, leaked_rep_socket ZMQ.Socket(ZMQ.REQ) do req_socket - leaked_req_socket = req_socket + leaked_req_socket = req_socket - ZMQ.Socket(ZMQ.REP) do rep_socket - leaked_rep_socket = rep_socket + ZMQ.Socket(ZMQ.REP) do rep_socket + leaked_rep_socket = rep_socket - ZMQ.bind(rep_socket, "inproc://tester") - ZMQ.connect(req_socket, "inproc://tester") + ZMQ.bind(rep_socket, "inproc://tester") + ZMQ.connect(req_socket, "inproc://tester") - ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.") - @test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you." - ZMQ.send(rep_socket, "Coming, Mr. Bell.") - @test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell." - end + ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.") + @test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you." + ZMQ.send(rep_socket, "Coming, Mr. Bell.") + @test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell." + end - @test !ZMQ.isopen(leaked_rep_socket) + @test !ZMQ.isopen(leaked_rep_socket) end @test !ZMQ.isopen(leaked_req_socket) local leaked_ctx ZMQ.Context() do ctx - leaked_ctx = ctx + leaked_ctx = ctx - @test isopen(ctx) + @test isopen(ctx) end @test !isopen(leaked_ctx) end