diff --git a/src/ZMQ.jl b/src/ZMQ.jl index 270c237..b72140c 100644 --- a/src/ZMQ.jl +++ b/src/ZMQ.jl @@ -3,7 +3,7 @@ module ZMQ using Base -import Base.convert, Base.ref +import Base.convert, Base.ref, Base.get, Base.bytestring export #Types @@ -13,24 +13,7 @@ export #Constants ZMQ_IO_THREADS,ZMQ_MAX_SOCKETS,ZMQ_PAIR,ZMQ_PUB,ZMQ_SUB,ZMQ_REQ,ZMQ_REP,ZMQ_DEALER,ZMQ_DEALER,ZMQ_PULL,ZMQ_PUSH,ZMQ_XPUB,ZMQ_XPUB,ZMQ_XREQ,ZMQ_XREP,ZMQ_UPSTREAM,ZMQ_DOWNSTREAM,ZMQ_MORE,ZMQ_MORE,ZMQ_SNDMORE,ZMQ_POLLIN,ZMQ_POLLOUT,ZMQ_POLLERR,ZMQ_STREAMER,ZMQ_FORWARDER,ZMQ_QUEUE -_jl_libzmq = dlopen("libzmq") - -_jl_zmq_version = dlsym(_jl_libzmq, :zmq_version) -_jl_zmq_errno = dlsym(_jl_libzmq, :zmq_errno) -_jl_zmq_strerror = dlsym(_jl_libzmq, :zmq_strerror) -_jl_zmq_socket = dlsym(_jl_libzmq, :zmq_socket) -_jl_zmq_close = dlsym(_jl_libzmq, :zmq_close) -_jl_zmq_getsockopt = dlsym(_jl_libzmq, :zmq_getsockopt) -_jl_zmq_setsockopt = dlsym(_jl_libzmq, :zmq_setsockopt) -_jl_zmq_bind = dlsym(_jl_libzmq, :zmq_bind) -_jl_zmq_connect = dlsym(_jl_libzmq, :zmq_connect) -_jl_zmq_msg_init_size = dlsym(_jl_libzmq, :zmq_msg_init_size) -_jl_zmq_msg_init = dlsym(_jl_libzmq, :zmq_msg_init) -_jl_zmq_msg_data = dlsym(_jl_libzmq, :zmq_msg_data) -_jl_zmq_msg_size = dlsym(_jl_libzmq, :zmq_msg_size) -_jl_zmq_msg_close = dlsym(_jl_libzmq, :zmq_msg_close) -_jl_zmq_send = dlsym(_jl_libzmq, :zmq_send) -_jl_zmq_recv = dlsym(_jl_libzmq, :zmq_recv) + const ZMQ_MAX_VSM_SIZE = 30 # This next should be replaced by a ccall, when packages can have C code const _jl_zmq_msg_t_size = ZMQ_MAX_VSM_SIZE + sizeof(Uint) + 2 @@ -47,8 +30,8 @@ show(io, thiserr::ZMQStateError) = print(io, "ZMQ: ", thiserr.msg) # Basic functions function jl_zmq_error_str() - errno = ccall(_jl_zmq_errno, Int32, ()) - c_strerror = ccall (_jl_zmq_strerror, Ptr{Uint8}, (Int32,), errno) + errno = ccall((:zmq_errno, :libzmq), Int32, ()) + c_strerror = ccall ((:zmq_strerror, :libzmq), Ptr{Uint8}, (Int32,), errno) if c_strerror != C_NULL strerror = bytestring(c_strerror) return strerror @@ -60,32 +43,14 @@ end let major = zeros(Int32, 1), minor = zeros(Int32, 1), patch = zeros(Int32, 1) global version function version() - ccall(_jl_zmq_version, Void, (Ptr{Int32}, Ptr{Int32}, Ptr{Int32}), major, minor, patch) + ccall((:zmq_version, :libzmq), Void, (Ptr{Int32}, Ptr{Int32}, Ptr{Int32}), major, minor, patch) return (major[1], minor[1], patch[1]) end end _zmq_major, _zmq_minor, _zmq_path = version() -# Version-dependent library symbols -if _zmq_major < 3 - # Version 2 context functions (now deprecated) - _jl_zmq_init = dlsym(_jl_libzmq, :zmq_init) - _jl_zmq_term = dlsym(_jl_libzmq, :zmq_term) -else - # Version 3 context functions - _jl_zmq_ctx_new = dlsym(_jl_libzmq, :zmq_ctx_new) - _jl_zmq_ctx_destroy = dlsym(_jl_libzmq, :zmq_ctx_destroy) - _jl_zmq_ctx_get = dlsym(_jl_libzmq, :zmq_ctx_get) - _jl_zmq_ctx_set = dlsym(_jl_libzmq, :zmq_ctx_set) - # Version 3 socket functions - _jl_zmq_disconnect = dlsym(_jl_libzmq, :zmq_disconnect) - _jl_zmq_unbind = dlsym(_jl_libzmq, :zmq_unbind) - # Version 3 message functions - _jl_zmq_msg_get = dlsym(_jl_libzmq, :zmq_msg_get) - _jl_zmq_msg_set = dlsym(_jl_libzmq, :zmq_msg_set) - _jl_zmq_msg_more = dlsym(_jl_libzmq, :zmq_msg_more) -end + ## Contexts ## # Provide the same constructor API for version 2 and version 3, even @@ -96,7 +61,7 @@ if _zmq_major < 3 data::Ptr{Void} function ZMQContext(n::Integer) - p = ccall(_jl_zmq_init, Ptr{Void}, (Int32,), n) + p = ccall((:zmq_init, :libzmq), Ptr{Void}, (Int32,), n) if p == C_NULL throw(ZMQStateError(jl_zmq_error_str())) end @@ -111,7 +76,7 @@ else # Versions 3 and higher data::Ptr{Void} function ZMQContext(n::Integer) - p = ccall(_jl_zmq_ctx_new, Ptr{Void}, ()) + p = ccall((:zmq_ctx_new, :libzmq), Ptr{Void}, ()) if p == C_NULL throw(ZMQStateError(jl_zmq_error_str())) end @@ -126,7 +91,7 @@ ZMQContext() = ZMQContext(1) if _zmq_major < 3 global close function close(ctx::ZMQContext) - rc = ccall(_jl_zmq_term, Int32, (Ptr{Void},), ctx.data) + rc = ccall((:zmq_term, :libzmq), Int32, (Ptr{Void},), ctx.data) if rc != 0 throw(ZMQStateError(jl_zmq_error_str())) end @@ -134,14 +99,14 @@ if _zmq_major < 3 else # Versions 3 and higher global close function close(ctx::ZMQContext) - rc = ccall(_jl_zmq_ctx_destroy, Int32, (Ptr{Void},), ctx.data) + rc = ccall((:zmq_ctx_destroy, :libzmq), Int32, (Ptr{Void},), ctx.data) if rc != 0 throw(ZMQStateError(jl_zmq_error_str())) end end global get function get(ctx::ZMQContext, option::Integer) - val = ccall(_jl_zmq_ctx_get, Int32, (Ptr{Void}, Int32), ctx.data, option) + val = ccall((:zmq_ctx_get, :libzmq), Int32, (Ptr{Void}, Int32), ctx.data, option) if val < 0 throw(ZMQStateError(jl_zmq_error_str())) end @@ -149,7 +114,7 @@ else # Versions 3 and higher end global set function set(ctx::ZMQContext, option::Integer, value::Integer) - rc = ccall(_jl_zmq_ctx_set, Int32, (Ptr{Void}, Int32, Int32), ctx.data, option, value) + rc = ccall((:zmq_ctx_set, :libzmq), Int32, (Ptr{Void}, Int32, Int32), ctx.data, option, value) if rc != 0 throw(ZMQStateError(jl_zmq_error_str())) end @@ -162,7 +127,7 @@ type ZMQSocket data::Ptr{Void} function ZMQSocket(ctx::ZMQContext, typ::Integer) - p = ccall(_jl_zmq_socket, Ptr{Void}, (Ptr{Void}, Int32), ctx.data, typ) + p = ccall((:zmq_socket, :libzmq), Ptr{Void}, (Ptr{Void}, Int32), ctx.data, typ) if p == C_NULL throw(ZMQStateError(jl_zmq_error_str())) end @@ -173,7 +138,7 @@ type ZMQSocket end function close(socket::ZMQSocket) - rc = ccall(_jl_zmq_close, Int32, (Ptr{Void},), socket.data) + rc = ccall((:zmq_close, :libzmq), Int32, (Ptr{Void},), socket.data) if rc != 0 throw(ZMQStateError(jl_zmq_error_str())) end @@ -238,7 +203,7 @@ for (fset, fget, k, p) in opslist @eval global ($fset) @eval function ($fset)(socket::ZMQSocket, option_val::Integer) ($p)[1] = option_val - rc = ccall(_jl_zmq_setsockopt, Int32, + rc = ccall((:zmq_setsockopt, :libzmq), Int32, (Ptr{Void}, Int32, Ptr{Void}, Uint), socket.data, $k, $p, sizeof(eltype($p))) if rc != 0 @@ -250,7 +215,7 @@ for (fset, fget, k, p) in opslist @eval global($fget) @eval function ($fget)(socket::ZMQSocket) ($sz)[1] = sizeof(eltype($p)) - rc = ccall(_jl_zmq_getsockopt, Int32, + rc = ccall((:zmq_getsockopt, :libzmq), Int32, (Ptr{Void}, Int32, Ptr{Void}, Ptr{Uint}), socket.data, $k, $p, $sz) if rc != 0 @@ -296,7 +261,7 @@ for (fset, fget, k) in opslist if length(option_val) > 255 throw(ZMQStateError("option value too large")) end - rc = ccall(_jl_zmq_setsockopt, Int32, + rc = ccall((:zmq_setsockopt, :libzmq), Int32, (Ptr{Void}, Int32, Ptr{Uint8}, Uint), socket.data, $k, option_val, length(option_val)) if rc != 0 @@ -308,7 +273,7 @@ for (fset, fget, k) in opslist @eval global ($fget) @eval function ($fget)(socket::ZMQSocket) ($sz)[1] = length($u8ap) - rc = ccall(_jl_zmq_getsockopt, Int32, + rc = ccall((:zmq_getsockopt, :libzmq), Int32, (Ptr{Void}, Int32, Ptr{Uint8}, Ptr{Uint}), socket.data, $k, $u8ap, $sz) if rc != 0 @@ -323,14 +288,14 @@ end # let function bind(socket::ZMQSocket, endpoint::String) - rc = ccall(_jl_zmq_bind, Int32, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint) + rc = ccall((:zmq_bind, :libzmq), Int32, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint) if rc != 0 throw(ZMQStateError(jl_zmq_error_str())) end end function connect(socket::ZMQSocket, endpoint::String) - rc=ccall(_jl_zmq_connect, Int32, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint) + rc=ccall((:zmq_connect, :libzmq), Int32, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint) if rc != 0 throw(ZMQStateError(jl_zmq_error_str())) end @@ -345,7 +310,7 @@ type ZMQMessage # Create an empty message (for receive) function ZMQMessage() obj = Array(Uint8, _jl_zmq_msg_t_size) - rc = ccall(_jl_zmq_msg_init, Int32, (Ptr{Void},), obj) + rc = ccall((:zmq_msg_init, :libzmq), Int32, (Ptr{Void},), obj) if rc != 0 throw(ZMQStateError(jl_zmq_error_str())) end @@ -356,7 +321,7 @@ type ZMQMessage # Create a message with a given buffer size (for send) function ZMQMessage(len::Integer) obj = Array(Uint8, _jl_zmq_msg_t_size) - rc = ccall(_jl_zmq_msg_init_size, Int32, (Ptr{Void}, Uint), obj, uint(len)) + rc = ccall((:zmq_msg_init_size, :libzmq), Int32, (Ptr{Void}, Uint), obj, uint(len)) if rc != 0 throw(ZMQStateError(jl_zmq_error_str())) end @@ -375,14 +340,7 @@ function ZMQMessage(data::ByteArray) msg_data(zmsg), data, len) return zmsg end -# Construct a message from a Array{Uint8} (including copying the data) -function ZMQMessage(data::Array{Uint8, 1}) - len = length(data) - zmsg = ZMQMessage(len) - ccall(:memcpy, Ptr{Void}, (Ptr{Uint8}, Ptr{Uint8}, Uint), - msg_data(zmsg), data, len) - return zmsg -end + # Convert message to array of Uint8 with Uint8[zmsg] # Copies the data function ref(::Type{Uint8}, zmsg::ZMQMessage) @@ -395,6 +353,7 @@ end # Convert message to string with ASCIIString[zmsg] # Copies the data ref(::Type{ASCIIString}, zmsg::ZMQMessage) = bytestring(msg_data(zmsg), msg_size(zmsg)) +bytestring(zmsg::ZMQMessage) = bytestring(msg_data(zmsg), msg_size(zmsg)) # Build an IOStream from a message # Copies the data function convert(::Type{IOStream}, zmsg::ZMQMessage) @@ -407,34 +366,34 @@ end # Close a message. You should not need to call this manually (let the # finalizer do it). function close(zmsg::ZMQMessage) - rc = ccall(_jl_zmq_msg_close, Int32, (Ptr{Uint8},), zmsg.obj) + rc = ccall((:zmq_msg_close, :libzmq), Int32, (Ptr{Uint8},), zmsg.obj) if rc != 0 throw(ZMQStateError(jl_zmq_error_str())) end end # Low-level functions # Extract a pointer to the ByteArray data in a message -msg_data(zmsg::ZMQMessage) = ccall(_jl_zmq_msg_data, Ptr{Uint8}, (Ptr{Uint8},), zmsg.obj) +msg_data(zmsg::ZMQMessage) = ccall((:zmq_msg_data, :libzmq), Ptr{Uint8}, (Ptr{Uint8},), zmsg.obj) # Determine the number of bytes in a message -msg_size(zmsg::ZMQMessage) = ccall(_jl_zmq_msg_size, Int, (Ptr{Uint8},) , zmsg.obj) +msg_size(zmsg::ZMQMessage) = ccall((:zmq_msg_size, :libzmq), Int, (Ptr{Uint8},) , zmsg.obj) + if _zmq_major > 2 - import Base.get + global get function get(zmsg::ZMQMessage, property::Integer) - val = ccall(_jl_zmq_msg_get, Int32, (Ptr{Void}, Int32), zmsg.data, property) + val = ccall((:zmq_msg_get, :libzmq), Int32, (Ptr{Void}, Int32), zmsg.data, property) if val < 0 throw(ZMQStateError(jl_zmq_error_str())) end end global set function set(zmsg::ZMQMessage, property::Integer, value::Integer) - rc = ccall(_jl_zmq_msg_set, Int32, (Ptr{Void}, Int32, Int32), zmsg.data, property, value) + rc = ccall((:zmq_msg_set, :libzmq), Int32, (Ptr{Void}, Int32, Int32), zmsg.data, property, value) if rc < 0 throw(ZMQStateError(jl_zmq_error_str())) end end end - ## Send/receive messages # # Julia defines two types of ZMQ messages: "raw" and "serialized". A "raw" @@ -450,11 +409,24 @@ function send(socket::ZMQSocket, zmsg::ZMQMessage, noblock::Bool, sndmore::Bool) if (sndmore) flag = flag | ZMQ_SNDMORE ; end send(socket, zmsg, flag) end -function send(socket::ZMQSocket, zmsg::ZMQMessage, flag::Integer) - rc = ccall(_jl_zmq_send, Int32, (Ptr{Void}, Ptr{Uint8}, Int32), - socket.data, zmsg.obj, flag) - if rc != 0 - throw(ZMQStateError(jl_zmq_error_str())) + +if _zmq_major == 2 + global send + function send(socket::ZMQSocket, zmsg::ZMQMessage, flag::Integer) + rc = ccall((:zmq_send, :libzmq), Int32, (Ptr{Void}, Ptr{Uint8}, Int32), + socket.data, zmsg.obj, flag) + if rc != 0 + throw(ZMQStateError(jl_zmq_error_str())) + end + end +elseif _zmq_major == 3 + global send + function send(socket::ZMQSocket, zmsg::ZMQMessage, flag::Integer) + rc = ccall((:zmq_msg_send, :libzmq), Int32, (Ptr{Void}, Ptr{Uint8}, Int32), + zmsg.obj, socket.data, flag) + if rc != 0 + throw(ZMQStateError(jl_zmq_error_str())) + end end end recv(socket::ZMQSocket) = recv(socket, int32(0)) @@ -463,14 +435,30 @@ function recv(socket::ZMQSocket, noblock::Bool) if (noblock) flag = flag | ZMQ_NOBLOCK ; end recv(socket, flag) end -function recv(socket::ZMQSocket, flag::Integer) - zmsg = ZMQMessage() - rc = ccall(_jl_zmq_recv, Int32, (Ptr{Void}, Ptr{Void}, Int32), - socket.data, zmsg.obj, flag) - if rc != 0 - throw(ZMQStateError(jl_zmq_error_str())) + +if _zmq_major == 2 + global recv + function recv(socket::ZMQSocket, flag::Integer) + zmsg = ZMQMessage() + rc = ccall((:zmq_recv, :libzmq), Int32, (Ptr{Void}, Ptr{Void}, Int32), + socket.data, zmsg.obj, flag) + if rc != 0 + throw(ZMQStateError(jl_zmq_error_str())) + end + return zmsg end - return zmsg +elseif _zmq_major == 3 + global recv + function recv(socket::ZMQSocket, flag::Integer) + zmsg = ZMQMessage() + rc = ccall((:zmq_msg_recv, :libzmq), Int32, (Ptr{Void}, Ptr{Void}, Int32), + zmsg.obj, socket.data, flag) + if rc != 0 + throw(ZMQStateError(jl_zmq_error_str())) + end + return zmsg + end + end @@ -508,6 +496,7 @@ const ZMQ_MORE = 1 #Send/Recv Options const ZMQ_NOBLOCK = 1 +const ZMQ_DONTWAIT = 1 const ZMQ_SNDMORE = 2 #IO Multiplexing diff --git a/test/ZMQ.jl b/test/ZMQ.jl index 4964f08..721ab8e 100644 --- a/test/ZMQ.jl +++ b/test/ZMQ.jl @@ -1,11 +1,11 @@ - -load ("ZMQ") -using Base +require("ZMQ") using ZMQ @assert length(ZMQ.version()) == 3 +major, minor, patch = ZMQ.version() + ctx=ZMQContext(1) @assert typeof(ctx) == ZMQContext @@ -42,15 +42,23 @@ end s1=ZMQSocket(ctx2, ZMQ_REP) -ZMQ.set_hwm(s1, 1000) +if major == 2 + ZMQ.set_hwm(s1, 1000) +else + ZMQ.set_sndhwm(s1, 1000) +end ZMQ.set_linger(s1, 1) ZMQ.set_identity(s1, "abcd") @assert ZMQ.get_identity(s1)::String == "abcd" -@assert ZMQ.get_hwm(s1)::Integer == 1000 +if major == 2 + @assert ZMQ.get_hwm(s1)::Integer == 1000 +else + @assert ZMQ.get_sndhwm(s1)::Integer == 1000 +end @assert ZMQ.get_linger(s1)::Integer == 1 -@assert ZMQ.get_rcvmore(s1) == false +@assert ZMQ.ismore(s1) == false s2=ZMQSocket(ctx2, ZMQ_REQ) @assert ZMQ.get_type(s1) == ZMQ_REP @@ -60,9 +68,9 @@ ZMQ.bind(s1, "tcp://*:5555") ZMQ.connect(s2, "tcp://localhost:5555") ZMQ.send(s2, ZMQMessage("test request")) -@assert (ASCIIString[ZMQ.recv(s1)] == "test request") +@assert (bytestring(ZMQ.recv(s1)) == "test request") ZMQ.send(s1, ZMQMessage("test response")) -@assert (ASCIIString[ZMQ.recv(s2)] == "test response") +@assert (bytestring(ZMQ.recv(s2)) == "test response") ZMQ.send(s2, ZMQMessage("another test request")) msg = ZMQ.recv(s1)