Skip to content

Commit

Permalink
Update ZMQ
Browse files Browse the repository at this point in the history
* Use new ccall interface
* implement bytestring(..) to convert msg to str
* implement v3 send/recv interface
  • Loading branch information
aviks committed Jan 11, 2013
1 parent 4b38427 commit be7f795
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 92 deletions.
157 changes: 73 additions & 84 deletions src/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module ZMQ

using Base
import Base.convert, Base.ref
import Base.convert, Base.ref, Base.get, Base.bytestring

export
#Types
Expand All @@ -13,24 +13,7 @@ export
#Constants
ZMQ_IO_THREADS,ZMQ_MAX_SOCKETS,ZMQ_PAIR,ZMQ_PUB,ZMQ_SUB,ZMQ_REQ,ZMQ_REP,ZMQ_DEALER,ZMQ_DEALER,ZMQ_PULL,ZMQ_PUSH,ZMQ_XPUB,ZMQ_XPUB,ZMQ_XREQ,ZMQ_XREP,ZMQ_UPSTREAM,ZMQ_DOWNSTREAM,ZMQ_MORE,ZMQ_MORE,ZMQ_SNDMORE,ZMQ_POLLIN,ZMQ_POLLOUT,ZMQ_POLLERR,ZMQ_STREAMER,ZMQ_FORWARDER,ZMQ_QUEUE

_jl_libzmq = dlopen("libzmq")

_jl_zmq_version = dlsym(_jl_libzmq, :zmq_version)
_jl_zmq_errno = dlsym(_jl_libzmq, :zmq_errno)
_jl_zmq_strerror = dlsym(_jl_libzmq, :zmq_strerror)
_jl_zmq_socket = dlsym(_jl_libzmq, :zmq_socket)
_jl_zmq_close = dlsym(_jl_libzmq, :zmq_close)
_jl_zmq_getsockopt = dlsym(_jl_libzmq, :zmq_getsockopt)
_jl_zmq_setsockopt = dlsym(_jl_libzmq, :zmq_setsockopt)
_jl_zmq_bind = dlsym(_jl_libzmq, :zmq_bind)
_jl_zmq_connect = dlsym(_jl_libzmq, :zmq_connect)
_jl_zmq_msg_init_size = dlsym(_jl_libzmq, :zmq_msg_init_size)
_jl_zmq_msg_init = dlsym(_jl_libzmq, :zmq_msg_init)
_jl_zmq_msg_data = dlsym(_jl_libzmq, :zmq_msg_data)
_jl_zmq_msg_size = dlsym(_jl_libzmq, :zmq_msg_size)
_jl_zmq_msg_close = dlsym(_jl_libzmq, :zmq_msg_close)
_jl_zmq_send = dlsym(_jl_libzmq, :zmq_send)
_jl_zmq_recv = dlsym(_jl_libzmq, :zmq_recv)

const ZMQ_MAX_VSM_SIZE = 30
# This next should be replaced by a ccall, when packages can have C code
const _jl_zmq_msg_t_size = ZMQ_MAX_VSM_SIZE + sizeof(Uint) + 2
Expand All @@ -47,8 +30,8 @@ show(io, thiserr::ZMQStateError) = print(io, "ZMQ: ", thiserr.msg)

# Basic functions
function jl_zmq_error_str()
errno = ccall(_jl_zmq_errno, Int32, ())
c_strerror = ccall (_jl_zmq_strerror, Ptr{Uint8}, (Int32,), errno)
errno = ccall((:zmq_errno, :libzmq), Int32, ())
c_strerror = ccall ((:zmq_strerror, :libzmq), Ptr{Uint8}, (Int32,), errno)
if c_strerror != C_NULL
strerror = bytestring(c_strerror)
return strerror
Expand All @@ -60,32 +43,14 @@ end
let major = zeros(Int32, 1), minor = zeros(Int32, 1), patch = zeros(Int32, 1)
global version
function version()
ccall(_jl_zmq_version, Void, (Ptr{Int32}, Ptr{Int32}, Ptr{Int32}), major, minor, patch)
ccall((:zmq_version, :libzmq), Void, (Ptr{Int32}, Ptr{Int32}, Ptr{Int32}), major, minor, patch)
return (major[1], minor[1], patch[1])
end
end

_zmq_major, _zmq_minor, _zmq_path = version()

# Version-dependent library symbols
if _zmq_major < 3
# Version 2 context functions (now deprecated)
_jl_zmq_init = dlsym(_jl_libzmq, :zmq_init)
_jl_zmq_term = dlsym(_jl_libzmq, :zmq_term)
else
# Version 3 context functions
_jl_zmq_ctx_new = dlsym(_jl_libzmq, :zmq_ctx_new)
_jl_zmq_ctx_destroy = dlsym(_jl_libzmq, :zmq_ctx_destroy)
_jl_zmq_ctx_get = dlsym(_jl_libzmq, :zmq_ctx_get)
_jl_zmq_ctx_set = dlsym(_jl_libzmq, :zmq_ctx_set)
# Version 3 socket functions
_jl_zmq_disconnect = dlsym(_jl_libzmq, :zmq_disconnect)
_jl_zmq_unbind = dlsym(_jl_libzmq, :zmq_unbind)
# Version 3 message functions
_jl_zmq_msg_get = dlsym(_jl_libzmq, :zmq_msg_get)
_jl_zmq_msg_set = dlsym(_jl_libzmq, :zmq_msg_set)
_jl_zmq_msg_more = dlsym(_jl_libzmq, :zmq_msg_more)
end


## Contexts ##
# Provide the same constructor API for version 2 and version 3, even
Expand All @@ -96,7 +61,7 @@ if _zmq_major < 3
data::Ptr{Void}

function ZMQContext(n::Integer)
p = ccall(_jl_zmq_init, Ptr{Void}, (Int32,), n)
p = ccall((:zmq_init, :libzmq), Ptr{Void}, (Int32,), n)
if p == C_NULL
throw(ZMQStateError(jl_zmq_error_str()))
end
Expand All @@ -111,7 +76,7 @@ else # Versions 3 and higher
data::Ptr{Void}

function ZMQContext(n::Integer)
p = ccall(_jl_zmq_ctx_new, Ptr{Void}, ())
p = ccall((:zmq_ctx_new, :libzmq), Ptr{Void}, ())
if p == C_NULL
throw(ZMQStateError(jl_zmq_error_str()))
end
Expand All @@ -126,30 +91,30 @@ ZMQContext() = ZMQContext(1)
if _zmq_major < 3
global close
function close(ctx::ZMQContext)
rc = ccall(_jl_zmq_term, Int32, (Ptr{Void},), ctx.data)
rc = ccall((:zmq_term, :libzmq), Int32, (Ptr{Void},), ctx.data)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
end
else # Versions 3 and higher
global close
function close(ctx::ZMQContext)
rc = ccall(_jl_zmq_ctx_destroy, Int32, (Ptr{Void},), ctx.data)
rc = ccall((:zmq_ctx_destroy, :libzmq), Int32, (Ptr{Void},), ctx.data)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
end
global get
function get(ctx::ZMQContext, option::Integer)
val = ccall(_jl_zmq_ctx_get, Int32, (Ptr{Void}, Int32), ctx.data, option)
val = ccall((:zmq_ctx_get, :libzmq), Int32, (Ptr{Void}, Int32), ctx.data, option)
if val < 0
throw(ZMQStateError(jl_zmq_error_str()))
end
return val
end
global set
function set(ctx::ZMQContext, option::Integer, value::Integer)
rc = ccall(_jl_zmq_ctx_set, Int32, (Ptr{Void}, Int32, Int32), ctx.data, option, value)
rc = ccall((:zmq_ctx_set, :libzmq), Int32, (Ptr{Void}, Int32, Int32), ctx.data, option, value)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
Expand All @@ -162,7 +127,7 @@ type ZMQSocket
data::Ptr{Void}

function ZMQSocket(ctx::ZMQContext, typ::Integer)
p = ccall(_jl_zmq_socket, Ptr{Void}, (Ptr{Void}, Int32), ctx.data, typ)
p = ccall((:zmq_socket, :libzmq), Ptr{Void}, (Ptr{Void}, Int32), ctx.data, typ)
if p == C_NULL
throw(ZMQStateError(jl_zmq_error_str()))
end
Expand All @@ -173,7 +138,7 @@ type ZMQSocket
end

function close(socket::ZMQSocket)
rc = ccall(_jl_zmq_close, Int32, (Ptr{Void},), socket.data)
rc = ccall((:zmq_close, :libzmq), Int32, (Ptr{Void},), socket.data)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
Expand Down Expand Up @@ -238,7 +203,7 @@ for (fset, fget, k, p) in opslist
@eval global ($fset)
@eval function ($fset)(socket::ZMQSocket, option_val::Integer)
($p)[1] = option_val
rc = ccall(_jl_zmq_setsockopt, Int32,
rc = ccall((:zmq_setsockopt, :libzmq), Int32,
(Ptr{Void}, Int32, Ptr{Void}, Uint),
socket.data, $k, $p, sizeof(eltype($p)))
if rc != 0
Expand All @@ -250,7 +215,7 @@ for (fset, fget, k, p) in opslist
@eval global($fget)
@eval function ($fget)(socket::ZMQSocket)
($sz)[1] = sizeof(eltype($p))
rc = ccall(_jl_zmq_getsockopt, Int32,
rc = ccall((:zmq_getsockopt, :libzmq), Int32,
(Ptr{Void}, Int32, Ptr{Void}, Ptr{Uint}),
socket.data, $k, $p, $sz)
if rc != 0
Expand Down Expand Up @@ -296,7 +261,7 @@ for (fset, fget, k) in opslist
if length(option_val) > 255
throw(ZMQStateError("option value too large"))
end
rc = ccall(_jl_zmq_setsockopt, Int32,
rc = ccall((:zmq_setsockopt, :libzmq), Int32,
(Ptr{Void}, Int32, Ptr{Uint8}, Uint),
socket.data, $k, option_val, length(option_val))
if rc != 0
Expand All @@ -308,7 +273,7 @@ for (fset, fget, k) in opslist
@eval global ($fget)
@eval function ($fget)(socket::ZMQSocket)
($sz)[1] = length($u8ap)
rc = ccall(_jl_zmq_getsockopt, Int32,
rc = ccall((:zmq_getsockopt, :libzmq), Int32,
(Ptr{Void}, Int32, Ptr{Uint8}, Ptr{Uint}),
socket.data, $k, $u8ap, $sz)
if rc != 0
Expand All @@ -323,14 +288,14 @@ end # let


function bind(socket::ZMQSocket, endpoint::String)
rc = ccall(_jl_zmq_bind, Int32, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
rc = ccall((:zmq_bind, :libzmq), Int32, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
end

function connect(socket::ZMQSocket, endpoint::String)
rc=ccall(_jl_zmq_connect, Int32, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
rc=ccall((:zmq_connect, :libzmq), Int32, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
Expand All @@ -345,7 +310,7 @@ type ZMQMessage
# Create an empty message (for receive)
function ZMQMessage()
obj = Array(Uint8, _jl_zmq_msg_t_size)
rc = ccall(_jl_zmq_msg_init, Int32, (Ptr{Void},), obj)
rc = ccall((:zmq_msg_init, :libzmq), Int32, (Ptr{Void},), obj)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
Expand All @@ -356,7 +321,7 @@ type ZMQMessage
# Create a message with a given buffer size (for send)
function ZMQMessage(len::Integer)
obj = Array(Uint8, _jl_zmq_msg_t_size)
rc = ccall(_jl_zmq_msg_init_size, Int32, (Ptr{Void}, Uint), obj, uint(len))
rc = ccall((:zmq_msg_init_size, :libzmq), Int32, (Ptr{Void}, Uint), obj, uint(len))
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
Expand All @@ -375,14 +340,7 @@ function ZMQMessage(data::ByteArray)
msg_data(zmsg), data, len)
return zmsg
end
# Construct a message from a Array{Uint8} (including copying the data)
function ZMQMessage(data::Array{Uint8, 1})
len = length(data)
zmsg = ZMQMessage(len)
ccall(:memcpy, Ptr{Void}, (Ptr{Uint8}, Ptr{Uint8}, Uint),
msg_data(zmsg), data, len)
return zmsg
end

# Convert message to array of Uint8 with Uint8[zmsg]
# Copies the data
function ref(::Type{Uint8}, zmsg::ZMQMessage)
Expand All @@ -395,6 +353,7 @@ end
# Convert message to string with ASCIIString[zmsg]
# Copies the data
ref(::Type{ASCIIString}, zmsg::ZMQMessage) = bytestring(msg_data(zmsg), msg_size(zmsg))
bytestring(zmsg::ZMQMessage) = bytestring(msg_data(zmsg), msg_size(zmsg))
# Build an IOStream from a message
# Copies the data
function convert(::Type{IOStream}, zmsg::ZMQMessage)
Expand All @@ -407,34 +366,34 @@ end
# Close a message. You should not need to call this manually (let the
# finalizer do it).
function close(zmsg::ZMQMessage)
rc = ccall(_jl_zmq_msg_close, Int32, (Ptr{Uint8},), zmsg.obj)
rc = ccall((:zmq_msg_close, :libzmq), Int32, (Ptr{Uint8},), zmsg.obj)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
end
# Low-level functions
# Extract a pointer to the ByteArray data in a message
msg_data(zmsg::ZMQMessage) = ccall(_jl_zmq_msg_data, Ptr{Uint8}, (Ptr{Uint8},), zmsg.obj)
msg_data(zmsg::ZMQMessage) = ccall((:zmq_msg_data, :libzmq), Ptr{Uint8}, (Ptr{Uint8},), zmsg.obj)
# Determine the number of bytes in a message
msg_size(zmsg::ZMQMessage) = ccall(_jl_zmq_msg_size, Int, (Ptr{Uint8},) , zmsg.obj)
msg_size(zmsg::ZMQMessage) = ccall((:zmq_msg_size, :libzmq), Int, (Ptr{Uint8},) , zmsg.obj)

if _zmq_major > 2
import Base.get

global get
function get(zmsg::ZMQMessage, property::Integer)
val = ccall(_jl_zmq_msg_get, Int32, (Ptr{Void}, Int32), zmsg.data, property)
val = ccall((:zmq_msg_get, :libzmq), Int32, (Ptr{Void}, Int32), zmsg.data, property)
if val < 0
throw(ZMQStateError(jl_zmq_error_str()))
end
end
global set
function set(zmsg::ZMQMessage, property::Integer, value::Integer)
rc = ccall(_jl_zmq_msg_set, Int32, (Ptr{Void}, Int32, Int32), zmsg.data, property, value)
rc = ccall((:zmq_msg_set, :libzmq), Int32, (Ptr{Void}, Int32, Int32), zmsg.data, property, value)
if rc < 0
throw(ZMQStateError(jl_zmq_error_str()))
end
end
end

## Send/receive messages
#
# Julia defines two types of ZMQ messages: "raw" and "serialized". A "raw"
Expand All @@ -450,11 +409,24 @@ function send(socket::ZMQSocket, zmsg::ZMQMessage, noblock::Bool, sndmore::Bool)
if (sndmore) flag = flag | ZMQ_SNDMORE ; end
send(socket, zmsg, flag)
end
function send(socket::ZMQSocket, zmsg::ZMQMessage, flag::Integer)
rc = ccall(_jl_zmq_send, Int32, (Ptr{Void}, Ptr{Uint8}, Int32),
socket.data, zmsg.obj, flag)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))

if _zmq_major == 2
global send
function send(socket::ZMQSocket, zmsg::ZMQMessage, flag::Integer)
rc = ccall((:zmq_send, :libzmq), Int32, (Ptr{Void}, Ptr{Uint8}, Int32),
socket.data, zmsg.obj, flag)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
end
elseif _zmq_major == 3
global send
function send(socket::ZMQSocket, zmsg::ZMQMessage, flag::Integer)
rc = ccall((:zmq_msg_send, :libzmq), Int32, (Ptr{Void}, Ptr{Uint8}, Int32),
zmsg.obj, socket.data, flag)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
end
end
recv(socket::ZMQSocket) = recv(socket, int32(0))
Expand All @@ -463,14 +435,30 @@ function recv(socket::ZMQSocket, noblock::Bool)
if (noblock) flag = flag | ZMQ_NOBLOCK ; end
recv(socket, flag)
end
function recv(socket::ZMQSocket, flag::Integer)
zmsg = ZMQMessage()
rc = ccall(_jl_zmq_recv, Int32, (Ptr{Void}, Ptr{Void}, Int32),
socket.data, zmsg.obj, flag)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))

if _zmq_major == 2
global recv
function recv(socket::ZMQSocket, flag::Integer)
zmsg = ZMQMessage()
rc = ccall((:zmq_recv, :libzmq), Int32, (Ptr{Void}, Ptr{Void}, Int32),
socket.data, zmsg.obj, flag)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
return zmsg
end
return zmsg
elseif _zmq_major == 3
global recv
function recv(socket::ZMQSocket, flag::Integer)
zmsg = ZMQMessage()
rc = ccall((:zmq_msg_recv, :libzmq), Int32, (Ptr{Void}, Ptr{Void}, Int32),
zmsg.obj, socket.data, flag)
if rc != 0
throw(ZMQStateError(jl_zmq_error_str()))
end
return zmsg
end

end


Expand Down Expand Up @@ -508,6 +496,7 @@ const ZMQ_MORE = 1

#Send/Recv Options
const ZMQ_NOBLOCK = 1
const ZMQ_DONTWAIT = 1
const ZMQ_SNDMORE = 2

#IO Multiplexing
Expand Down
Loading

0 comments on commit be7f795

Please sign in to comment.