Skip to content

Commit

Permalink
condense send/recv functions (fixes #19); support copy-free messages …
Browse files Browse the repository at this point in the history
…from Arrays and Strings (fixes #21)
  • Loading branch information
stevengj committed Jul 23, 2013
1 parent a8eecdf commit 8d629b9
Showing 1 changed file with 29 additions and 42 deletions.
71 changes: 29 additions & 42 deletions src/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ end


## Messages ##
typealias ByteArray Union(Array{Uint8,1}, ByteString)
type Message <: AbstractArray{Uint8,1}
# 32 bytes (for v3) + a pointer (for v2)
w0::Int64
w1::Int64
w2::Int64
w3::Int64
w4::Int
bufferorigin::Any # prevent source of buffer (if any) from being gc'ed

# Create an empty message (for receive)
function Message()
Expand All @@ -317,21 +317,24 @@ type Message <: AbstractArray{Uint8,1}
finalizer(zmsg, close)
return zmsg
end
end

# Construct a message from a string (including copying the string)
# In many cases it's more efficient to allocate the zmsg first and
# then build the data in-place, but this is here for convenience
function Message(data::ByteArray)
len = length(data)
zmsg = Message(len)
ccall(:memcpy, Ptr{Void}, (Ptr{Uint8}, Ptr{Uint8}, Uint),
zmsg, data, len)
return zmsg
# Create a message with a given String or Array as a buffer (for send)
Message(m::ByteString) = Message(m, convert(Ptr{Uint8}, m), length(m))
function Message{T}(origin, m::Ptr{T}, len::Integer)
zmsg = new()
zmsg.bufferorigin = origin # should be origin of data pointed to by m
rc = ccall((:zmq_msg_init_data, :libzmq), Cint, (Ptr{Message}, Ptr{T}, Csize_t, Ptr{Void}, Ptr{Void}), &zmsg, m, len*sizeof(T), C_NULL, C_NULL)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
finalizer(zmsg, close)
return zmsg
end
Message(m::ByteString) = Message(m, convert(Ptr{Uint8}, m), length(m))
Message{T}(m::Array{T}) = Message(m, convert(Ptr{T}, m), length(m))
end

# AbstractArray behaviors:
similar(a::Message, T, dims::Dims) = Array(T, dims)
similar(a::Message, T, dims::Dims) = Array(T, dims) # ?
length(zmsg::Message) = ccall((:zmq_msg_size, :libzmq), Csize_t, (Ptr{Message},) , &zmsg)
size(zmsg::Message) = (length(zmsg),)
stride(zmsg::Message, i::Integer) = i <= 1 ? 1 : length(zmsg)
Expand Down Expand Up @@ -390,52 +393,43 @@ end # end v3only
# of bytes. You send these with the following:
# send(socket, zmsg)
# zmsg = recv(socket)
send(socket::Socket, zmsg::Message) = send(socket, zmsg, int32(0))
function send(socket::Socket, zmsg::Message, noblock::Bool, sndmore::Bool)

flag::Cint = 0;
if (noblock) flag = flag | NOBLOCK ; end
if (sndmore) flag = flag | SNDMORE ; end
send(socket, zmsg, flag)
end

@v2only begin
function send(socket::Socket, zmsg::Message, flag::Integer)
function send(socket::Socket, zmsg::Message, flag=int32(0))
rc = ccall((:zmq_send, :libzmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
socket.data, &zmsg, flag)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
end
send(socket::Socket, msg::String, flag=int32(0)) =
send(socket, Message(msg), flag)
send{T}(socket::Socket, msg::Array{T}, flag=int32(0)) =
send(socket, Message(msg), flag)
end # end v2only

@v3only begin
function send(socket::Socket, zmsg::Message, flag::Integer)
function send(socket::Socket, zmsg::Message, flag=int32(0))
rc = ccall((:zmq_msg_send, :libzmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
&zmsg, socket.data, flag)
if rc == -1
throw(StateError(jl_zmq_error_str()))
end
end
function send(socket::Socket, msg::String, flag::Integer)
function send{T}(socket::Socket, msg::Ptr{T}, len, flag=int32(0))
rc = ccall((:zmq_send, :libzmq), Cint,
(Ptr{Void}, Ptr{Uint8}, Uint, Cint),
socket.data, msg, length(msg), flag)
(Ptr{Void}, Ptr{T}, Csize_t, Cint),
socket.data, msg, len * sizeof(T), flag)
if rc == -1
throw(StateError(jl_zmq_error_str()))
end
end
send(socket::Socket, msg::String) = send(socket, msg, int32(0))
send(socket::Socket, msg::String, flag=int32(0)) = send(socket, convert(Ptr{Uint8}, msg), length(msg), flag)
send{T}(socket::Socket, msg::Array{T}, flag=int32(0)) = send(socket, convert(Ptr{T}, msg), length(msg), flag)
end # end v3only
recv(socket::Socket) = recv(socket, int32(0))
function recv(socket::Socket, noblock::Bool)
flag::Cint = 0;
if (noblock) flag = flag | NOBLOCK ; end
recv(socket, flag)
end

@v2only begin
function recv(socket::Socket, flag::Integer)
function recv(socket::Socket, flag=int32(0))
zmsg = Message()
rc = ccall((:zmq_recv, :libzmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
socket.data, &zmsg, flag)
Expand All @@ -447,7 +441,7 @@ end
end # end v2only

@v3only begin
function recv(socket::Socket, flag::Integer)
function recv(socket::Socket, flag=int32(0))
zmsg = Message()
rc = ccall((:zmq_msg_recv, :libzmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
&zmsg, socket.data, flag)
Expand All @@ -458,13 +452,6 @@ function recv(socket::Socket, flag::Integer)
end
end # end v3only


# A "serialized" message includes information needed to interpret the
# data. For example, sending an array requires information about the
# element type and dimensions. See zmq_serialize.jl.



## Constants

# Context options
Expand Down

0 comments on commit 8d629b9

Please sign in to comment.