Skip to content

Commit

Permalink
Merge pull request #51 from JuliaLang/zerocopy
Browse files Browse the repository at this point in the history
safer zero-copy semantics (fix #47)
  • Loading branch information
stevengj committed Apr 4, 2014
2 parents 77c5d7c + 69b1c8d commit 3d394af
Showing 1 changed file with 57 additions and 26 deletions.
83 changes: 57 additions & 26 deletions src/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,25 @@ function connect(socket::Socket, endpoint::String)
end
end

# in order to support zero-copy messages that share data with Julia
# arrays, we need to hold a reference to the Julia object in a dictionary
# until zeromq is done with the data, to prevent it from being garbage
# collected. The gc_protect dictionary is keyed by a uv_async_t* pointer,
# used in uv_async_send to tell Julia to when zeromq is done with the data.
const gc_protect = Dict{Ptr{Void},Any}()
gc_protect_cb(work, status) = pop!(gc_protect, work.handle, nothing)
function gc_protect_handle(obj::Any)
work = Base.SingleAsyncWork(gc_protect_cb)
gc_protect[work.handle] = (work,obj)
work.handle
end

# Thread-safe zeromq callback when data is freed, passed to zmq_msg_init_data.
# The hint parameter will be a uv_async_t* pointer.
function gc_free_fn(data::Ptr{Void}, hint::Ptr{Void})
ccall(:uv_async_send,Cint,(Ptr{Void},),hint)
end
const gc_free_fn_c = cfunction(gc_free_fn, Cint, (Ptr{Void}, Ptr{Void}))

## Messages ##
type Message <: AbstractArray{Uint8,1}
Expand All @@ -322,11 +341,12 @@ type Message <: AbstractArray{Uint8,1}
w2::Int64
w3::Int64
w4::Int
bufferorigin::Any # prevent source of buffer (if any) from being gc'ed
handle::Ptr{Void} # index into gc_protect, if any

# Create an empty message (for receive)
function Message()
zmsg = new()
zmsg.handle = C_NULL
rc = ccall((:zmq_msg_init, zmq), Cint, (Ptr{Message},), &zmsg)
if rc != 0
throw(StateError(jl_zmq_error_str()))
Expand All @@ -337,34 +357,48 @@ type Message <: AbstractArray{Uint8,1}
# Create a message with a given buffer size (for send)
function Message(len::Integer)
zmsg = new()
zmsg.handle = C_NULL
rc = ccall((:zmq_msg_init_size, zmq), Cint, (Ptr{Message}, Csize_t), &zmsg, len)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
finalizer(zmsg, close)
return zmsg
end
function Message{T}(origin, m::Ptr{T}, len::Integer)

# low-level function to create a message (for send) with an existing
# data buffer, without making a copy. The origin parameter should
# be the Julia object that is the origin of the data, so that
# we can hold a reference to it until zeromq is done with the buffer.
function Message{T}(origin::Any, m::Ptr{T}, len::Integer)
zmsg = new()
zmsg.bufferorigin = origin # should be origin of data pointed to by m
rc = ccall((:zmq_msg_init_data, zmq), Cint, (Ptr{Message}, Ptr{T}, Csize_t, Ptr{Void}, Ptr{Void}), &zmsg, m, len, C_NULL, C_NULL)
zmsg.handle = gc_protect_handle(origin)
rc = ccall((:zmq_msg_init_data, zmq), Cint, (Ptr{Message}, Ptr{T}, Csize_t, Ptr{Void}, Ptr{Void}), &zmsg, m, len, gc_free_fn_c, zmsg.handle)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
finalizer(zmsg, close)
return zmsg
end

# Create a message with a given String or Array as a buffer (for send)
# (note: now "owns" the buffer ... the Array must not be resized,
# or even written to after the message is sent!)
Message(m::ByteString) = Message(m, convert(Ptr{Uint8}, m), sizeof(m))
Message{T<:ByteString}(p::SubString{T}) =
Message(p, pointer(p.string.data)+p.offset, sizeof(p))
Message(a::Array) = Message(a, pointer(a), sizeof(a))
function Message(io::IOBuffer)
if !io.readable || !io.seekable
error("byte read failed")
end
Message(io, convert(Ptr{Uint8}, io.data), io.size)
Message(io.data)
end
end

# Create a message with a given String or Array as a buffer (for send)
Message(m::ByteString) = Message(m, convert(Ptr{Uint8}, m), sizeof(m))
Message{T}(m::Array{T}) = Message(m, convert(Ptr{T}, m), sizeof(m))
# check whether zeromq has called our free-function, i.e. whether
# we are save to reclaim ownership of any buffer object
isfreed(m::Message) = haskey(gc_protect, m.handle)

# AbstractArray behaviors:
similar(a::Message, T, dims::Dims) = Array(T, dims) # ?
Expand Down Expand Up @@ -428,7 +462,7 @@ end # end v3only
# zmsg = recv(socket)

#Send/Recv Options
const NOBLOCK = 1
const NOBLOCK = 1 # deprecated old name for DONTWAIT in ZMQ v2
const DONTWAIT = 1
const SNDMORE = 2

Expand All @@ -440,10 +474,6 @@ function send(socket::Socket, zmsg::Message, flag=int32(0))
throw(StateError(jl_zmq_error_str()))
end
end
send(socket::Socket, msg::String, flag=int32(0)) =
send(socket, Message(msg), flag)
send{T}(socket::Socket, msg::Array{T}, flag=int32(0)) =
send(socket, Message(msg), flag)
end # end v2only

@v3only begin
Expand All @@ -457,21 +487,22 @@ function send(socket::Socket, zmsg::Message, flag=int32(0))
throw(StateError(jl_zmq_error_str()))
end
end
function send{T}(socket::Socket, msg::Ptr{T}, len, flag=int32(0))
if (get_events(socket) & POLLOUT) == 0
wait(socket; writable = true)
end
rc = ccall((:zmq_send, zmq), Cint,
(Ptr{Void}, Ptr{T}, Csize_t, Cint),
socket.data, msg, len, flag)
if rc == -1
throw(StateError(jl_zmq_error_str()))
end
end
send(socket::Socket, msg::String, flag=int32(0)) = send(socket, convert(Ptr{Uint8}, msg), sizeof(msg), flag)
send{T}(socket::Socket, msg::Array{T}, flag=int32(0)) = send(socket, convert(Ptr{T}, msg), sizeof(msg), flag)
end # end v3only

# strings are immutable, so we can send them zero-copy by default
send(socket::Socket, msg::String, flag=int32(0)) = send(socket, Message(msg), flag)

# Make a copy of arrays before sending, by default, since it is too
# dangerous to require that the array not change until ZMQ is done with it.
# For zero-copy array messages, construct a Message explicitly.
send(socket::Socket, msg::AbstractArray, flag=int32(0)) = send(socket, Message(copy(msg)), flag)

function send(f::Function, socket::Socket, flag=int32(0))
io = IOBuffer()
f(io)
send(socket, Message(io), flag)
end

@v2only begin
function recv(socket::Socket)
zmsg = Message()
Expand Down

0 comments on commit 3d394af

Please sign in to comment.