diff --git a/src/ZMQ.jl b/src/ZMQ.jl index 5e051f3..a9473dd 100644 --- a/src/ZMQ.jl +++ b/src/ZMQ.jl @@ -313,6 +313,25 @@ function connect(socket::Socket, endpoint::String) end end +# in order to support zero-copy messages that share data with Julia +# arrays, we need to hold a reference to the Julia object in a dictionary +# until zeromq is done with the data, to prevent it from being garbage +# collected. The gc_protect dictionary is keyed by a uv_async_t* pointer, +# used in uv_async_send to tell Julia to when zeromq is done with the data. +const gc_protect = Dict{Ptr{Void},Any}() +gc_protect_cb(work, status) = pop!(gc_protect, work.handle, nothing) +function gc_protect_handle(obj::Any) + work = Base.SingleAsyncWork(gc_protect_cb) + gc_protect[work.handle] = (work,obj) + work.handle +end + +# Thread-safe zeromq callback when data is freed, passed to zmq_msg_init_data. +# The hint parameter will be a uv_async_t* pointer. +function gc_free_fn(data::Ptr{Void}, hint::Ptr{Void}) + ccall(:uv_async_send,Cint,(Ptr{Void},),hint) +end +const gc_free_fn_c = cfunction(gc_free_fn, Cint, (Ptr{Void}, Ptr{Void})) ## Messages ## type Message <: AbstractArray{Uint8,1} @@ -322,11 +341,12 @@ type Message <: AbstractArray{Uint8,1} w2::Int64 w3::Int64 w4::Int - bufferorigin::Any # prevent source of buffer (if any) from being gc'ed + handle::Ptr{Void} # index into gc_protect, if any # Create an empty message (for receive) function Message() zmsg = new() + zmsg.handle = C_NULL rc = ccall((:zmq_msg_init, zmq), Cint, (Ptr{Message},), &zmsg) if rc != 0 throw(StateError(jl_zmq_error_str())) @@ -337,6 +357,7 @@ type Message <: AbstractArray{Uint8,1} # Create a message with a given buffer size (for send) function Message(len::Integer) zmsg = new() + zmsg.handle = C_NULL rc = ccall((:zmq_msg_init_size, zmq), Cint, (Ptr{Message}, Csize_t), &zmsg, len) if rc != 0 throw(StateError(jl_zmq_error_str())) @@ -344,27 +365,40 @@ type Message <: AbstractArray{Uint8,1} finalizer(zmsg, close) return zmsg end - function Message{T}(origin, m::Ptr{T}, len::Integer) + + # low-level function to create a message (for send) with an existing + # data buffer, without making a copy. The origin parameter should + # be the Julia object that is the origin of the data, so that + # we can hold a reference to it until zeromq is done with the buffer. + function Message{T}(origin::Any, m::Ptr{T}, len::Integer) zmsg = new() - zmsg.bufferorigin = origin # should be origin of data pointed to by m - rc = ccall((:zmq_msg_init_data, zmq), Cint, (Ptr{Message}, Ptr{T}, Csize_t, Ptr{Void}, Ptr{Void}), &zmsg, m, len, C_NULL, C_NULL) + zmsg.handle = gc_protect_handle(origin) + rc = ccall((:zmq_msg_init_data, zmq), Cint, (Ptr{Message}, Ptr{T}, Csize_t, Ptr{Void}, Ptr{Void}), &zmsg, m, len, gc_free_fn_c, zmsg.handle) if rc != 0 throw(StateError(jl_zmq_error_str())) end finalizer(zmsg, close) return zmsg end + + # Create a message with a given String or Array as a buffer (for send) + # (note: now "owns" the buffer ... the Array must not be resized, + # or even written to after the message is sent!) + Message(m::ByteString) = Message(m, convert(Ptr{Uint8}, m), sizeof(m)) + Message{T<:ByteString}(p::SubString{T}) = + Message(p, pointer(p.string.data)+p.offset, sizeof(p)) + Message(a::Array) = Message(a, pointer(a), sizeof(a)) function Message(io::IOBuffer) if !io.readable || !io.seekable error("byte read failed") end - Message(io, convert(Ptr{Uint8}, io.data), io.size) + Message(io.data) end end -# Create a message with a given String or Array as a buffer (for send) -Message(m::ByteString) = Message(m, convert(Ptr{Uint8}, m), sizeof(m)) -Message{T}(m::Array{T}) = Message(m, convert(Ptr{T}, m), sizeof(m)) +# check whether zeromq has called our free-function, i.e. whether +# we are save to reclaim ownership of any buffer object +isfreed(m::Message) = haskey(gc_protect, m.handle) # AbstractArray behaviors: similar(a::Message, T, dims::Dims) = Array(T, dims) # ? @@ -428,7 +462,7 @@ end # end v3only # zmsg = recv(socket) #Send/Recv Options -const NOBLOCK = 1 +const NOBLOCK = 1 # deprecated old name for DONTWAIT in ZMQ v2 const DONTWAIT = 1 const SNDMORE = 2 @@ -440,10 +474,6 @@ function send(socket::Socket, zmsg::Message, flag=int32(0)) throw(StateError(jl_zmq_error_str())) end end -send(socket::Socket, msg::String, flag=int32(0)) = - send(socket, Message(msg), flag) -send{T}(socket::Socket, msg::Array{T}, flag=int32(0)) = - send(socket, Message(msg), flag) end # end v2only @v3only begin @@ -457,21 +487,22 @@ function send(socket::Socket, zmsg::Message, flag=int32(0)) throw(StateError(jl_zmq_error_str())) end end -function send{T}(socket::Socket, msg::Ptr{T}, len, flag=int32(0)) - if (get_events(socket) & POLLOUT) == 0 - wait(socket; writable = true) - end - rc = ccall((:zmq_send, zmq), Cint, - (Ptr{Void}, Ptr{T}, Csize_t, Cint), - socket.data, msg, len, flag) - if rc == -1 - throw(StateError(jl_zmq_error_str())) - end -end -send(socket::Socket, msg::String, flag=int32(0)) = send(socket, convert(Ptr{Uint8}, msg), sizeof(msg), flag) -send{T}(socket::Socket, msg::Array{T}, flag=int32(0)) = send(socket, convert(Ptr{T}, msg), sizeof(msg), flag) end # end v3only +# strings are immutable, so we can send them zero-copy by default +send(socket::Socket, msg::String, flag=int32(0)) = send(socket, Message(msg), flag) + +# Make a copy of arrays before sending, by default, since it is too +# dangerous to require that the array not change until ZMQ is done with it. +# For zero-copy array messages, construct a Message explicitly. +send(socket::Socket, msg::AbstractArray, flag=int32(0)) = send(socket, Message(copy(msg)), flag) + +function send(f::Function, socket::Socket, flag=int32(0)) + io = IOBuffer() + f(io) + send(socket, Message(io), flag) +end + @v2only begin function recv(socket::Socket) zmsg = Message()