From 30d1e32a99e719c2f872fb91baabaefb67cbf667 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Fri, 29 Nov 2024 23:54:49 +0100 Subject: [PATCH] Implement send_multipart() and recv_multipart() --- docs/src/_changelog.md | 6 ++++++ docs/src/reference.md | 2 ++ src/comm.jl | 39 +++++++++++++++++++++++++++++++++++++++ test/runtests.jl | 16 ++++++++++++++++ 4 files changed, 63 insertions(+) diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index d2f5991..aee7adb 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -7,6 +7,12 @@ CurrentModule = ZMQ This documents notable changes in ZMQ.jl. The format is based on [Keep a Changelog](https://keepachangelog.com). +## [v1.4.0] - 2024-11-30 + +### Added +- Implemented [`send_multipart()`](@ref) and [`recv_multipart()`](@ref) for + working with multipart messages ([#253]). + ## [v1.3.0] - 2024-08-03 ### Added diff --git a/docs/src/reference.md b/docs/src/reference.md index 89e2fc2..675fce7 100644 --- a/docs/src/reference.md +++ b/docs/src/reference.md @@ -27,7 +27,9 @@ close bind connect recv +recv_multipart send +send_multipart ``` ZMQ socket types (note: some of these are aliases; e.g. `XREQ = DEALER`): diff --git a/src/comm.jl b/src/comm.jl index f5676df..0c523d5 100644 --- a/src/comm.jl +++ b/src/comm.jl @@ -60,6 +60,19 @@ function Sockets.send(f::Function, socket::Socket; more::Bool=false) send(socket, take!(io); more=more) end +""" + send_multipart(socket::Socket, parts) + +Send a multipart message composed of the elements in `parts`. `parts` may be any +object that supports `getindex()`, `eachindex()`, and `lastindex()`. +""" +function send_multipart(socket::Socket, parts) + for i in eachindex(parts) + is_last = i == lastindex(parts) + send(socket, parts[i]; more=!is_last) + end +end + ############################################################################ function _recv!(socket::Socket, zmsg) @@ -107,3 +120,29 @@ function Sockets.recv(socket::Socket, ::Type{T}) where {T} close(zmsg) end end + +# Specialization so that recv(::Socket, ::Message) works +Sockets.recv(socket::Socket, ::Type{Message}) = recv(socket) + +""" + recv_multipart(socket::Socket, ::Type{T}) -> Vector{T} + +Receive a multipart message of a specific type `T`. This behaves in the same way +as [`recv(::Socket, ::Type)`](@ref). +""" +function recv_multipart(socket::Socket, ::Type{T}) where {T} + parts = T[recv(socket, T)] + while socket.rcvmore + push!(parts, recv(socket, T)) + end + + return parts +end + +""" + recv_multipart(socket::Socket) -> Vector{Message} + +Receive a multipart message as a sequence of zero-copy [`Message`](@ref)'s. See +[`recv(::Socket)`](@ref). +""" +recv_multipart(socket::Socket) = recv_multipart(socket, Message) diff --git a/test/runtests.jl b/test/runtests.jl index 100dfa6..59d0b1e 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -137,6 +137,22 @@ end finalize(m) end + # Test multipart messages + data = ["foo", "bar", "baz"] + ZMQ.send_multipart(s2, data) + + # Test receiving Message's + msgs = ZMQ.recv_multipart(s1) + @test msgs isa Vector{Message} + @test String.(msgs) == data + + # Test receiving a specific type + data = Int[1, 2, 3] + ZMQ.send_multipart(s1, data) + msgs = ZMQ.recv_multipart(s2, Int) + @test msgs isa Vector{Int} + @test msgs == data + # ZMQ.close(s1); ZMQ.close(s2) # should happen when context is closed ZMQ.close(ZMQ._context) # immediately close global context rather than waiting for exit @test !isopen(s1)