diff --git a/src/ZMQ.jl b/src/ZMQ.jl index 3bf8118..b1f08f3 100644 --- a/src/ZMQ.jl +++ b/src/ZMQ.jl @@ -1,7 +1,6 @@ # Support for ZeroMQ, a network and interprocess communication library module ZMQ -import ZeroMQ_jll: libzmq using Base.Libc: EAGAIN using FileWatching: UV_READABLE, uv_pollcb, FDWatcher @@ -19,7 +18,7 @@ export #Sockets connect, bind, send, recv - +include("bindings.jl") include("constants.jl") include("optutil.jl") include("error.jl") @@ -27,13 +26,14 @@ include("context.jl") include("socket.jl") include("sockopts.jl") include("message.jl") +include("msg_bindings.jl") include("comm.jl") function __init__() major = Ref{Cint}() minor = Ref{Cint}() patch = Ref{Cint}() - ccall((:zmq_version, libzmq), Cvoid, (Ptr{Cint}, Ptr{Cint}, Ptr{Cint}), major, minor, patch) + lib.zmq_version(major, minor, patch) global version = VersionNumber(major[], minor[], patch[]) if version < v"3" error("ZMQ version $version < 3 is not supported") diff --git a/src/_message.jl b/src/_message.jl index 1f2b240..c4e5981 100644 --- a/src/_message.jl +++ b/src/_message.jl @@ -3,34 +3,34 @@ # Low-level message type, matching the declaration of # zmq_msg_t in the header: char _[64]; -primitive type _Message 64 * 8 end +const _Message = lib.zmq_msg_t const _MessageOrRef = Union{_Message,Base.RefValue{_Message}} function msg_init() zmsg = Ref{_Message}() - rc = ccall((:zmq_msg_init, libzmq), Cint, (Ref{_Message},), zmsg) + rc = lib.zmq_msg_init(zmsg) rc != 0 && throw(StateError(jl_zmq_error_str())) return zmsg end function msg_init(nbytes::Int) zmsg = Ref{_Message}() - rc = ccall((:zmq_msg_init_size, libzmq), Cint, (Ref{_Message}, Csize_t), zmsg, nbytes % Csize_t) + rc = lib.zmq_msg_init_size(zmsg, nbytes % Csize_t) rc != 0 && throw(StateError(jl_zmq_error_str())) return zmsg end # note: no finalizer for _Message, so we need to call close manually! function Base.close(zmsg::_MessageOrRef) - rc = ccall((:zmq_msg_close, libzmq), Cint, (Ref{_Message},), zmsg) + rc = lib.zmq_msg_close(zmsg) rc != 0 && throw(StateError(jl_zmq_error_str())) return nothing end -Base.length(zmsg::_MessageOrRef) = ccall((:zmq_msg_size, libzmq), Csize_t, (Ref{_Message},), zmsg) % Int -Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::_MessageOrRef) = - ccall((:zmq_msg_data, libzmq), Ptr{UInt8}, (Ref{_Message},), zmsg) +Base.length(zmsg::_MessageOrRef) = lib.zmq_msg_size(zmsg) % Int +Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::_Message) = Ptr{UInt8}(lib.zmq_msg_data(Ref(zmsg))) +Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::Base.RefValue{_Message}) = Ptr{UInt8}(lib.zmq_msg_data(zmsg)) # isbits data, vectors thereof, and strings can be converted to/from _Message diff --git a/src/comm.jl b/src/comm.jl index 00529e5..f5676df 100644 --- a/src/comm.jl +++ b/src/comm.jl @@ -3,15 +3,10 @@ ############################################################################ -msg_send(socket::Socket, zmsg::_MessageOrRef, flags::Integer) = - ccall((:zmq_msg_send, libzmq), Cint, (Ref{_Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags) -msg_send(socket::Socket, zmsg::Message, flags::Integer) = - ccall((:zmq_msg_send, libzmq), Cint, (Ref{Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags) - function _send(socket::Socket, zmsg, more::Bool=false) while true - if -1 == msg_send(socket, zmsg, (ZMQ_SNDMORE*more) | ZMQ_DONTWAIT) - zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str())) + if -1 == lib.zmq_msg_send(zmsg, socket, (ZMQ_SNDMORE*more) | ZMQ_DONTWAIT) + lib.zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str())) while (socket.events & POLLOUT) == 0 wait(socket) end @@ -67,15 +62,10 @@ end ############################################################################ -msg_recv(socket::Socket, zmsg::_MessageOrRef, flags::Integer) = - ccall((:zmq_msg_recv, libzmq), Cint, (Ref{_Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags) -msg_recv(socket::Socket, zmsg::Message, flags::Integer) = - ccall((:zmq_msg_recv, libzmq), Cint, (Ref{Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags) - function _recv!(socket::Socket, zmsg) while true - if -1 == msg_recv(socket, zmsg, ZMQ_DONTWAIT) - zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str())) + if -1 == lib.zmq_msg_recv(zmsg, socket, ZMQ_DONTWAIT) + lib.zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str())) while socket.events & POLLIN== 0 wait(socket) end diff --git a/src/constants.jl b/src/constants.jl index 324b004..fe429f4 100644 --- a/src/constants.jl +++ b/src/constants.jl @@ -2,32 +2,32 @@ ## Constants # Context options -const IO_THREADS = 1 -const MAX_SOCKETS = 2 -const IPV6 = 42 +const IO_THREADS = lib.ZMQ_IO_THREADS +const MAX_SOCKETS = lib.ZMQ_MAX_SOCKETS +const IPV6 = lib.ZMQ_IPV6 "[PAIR](https://zeromq.org/socket-api/#pair-socket) socket." -const PAIR = 0 +const PAIR = lib.ZMQ_PAIR "[PUB](https://zeromq.org/socket-api/#pub-socket) socket." -const PUB = 1 +const PUB = lib.ZMQ_PUB "[SUB](https://zeromq.org/socket-api/#sub-socket) socket." -const SUB = 2 +const SUB = lib.ZMQ_SUB "[REQ](https://zeromq.org/socket-api/#req-socket) socket." -const REQ = 3 +const REQ = lib.ZMQ_REQ "[REP](https://zeromq.org/socket-api/#rep-socket) socket." -const REP = 4 +const REP = lib.ZMQ_REP "[DEALER](https://zeromq.org/socket-api/#dealer-socket) socket." -const DEALER = 5 +const DEALER = lib.ZMQ_DEALER "[ROUTER](https://zeromq.org/socket-api/#router-socket) socket." -const ROUTER = 6 +const ROUTER = lib.ZMQ_ROUTER "[PULL](https://zeromq.org/socket-api/#pull-socket) socket." -const PULL = 7 +const PULL = lib.ZMQ_PULL "[PUSH](https://zeromq.org/socket-api/#push-socket) socket." -const PUSH = 8 +const PUSH = lib.ZMQ_PUSH "[XPUB](https://zeromq.org/socket-api/#xpub-socket) socket." -const XPUB = 9 +const XPUB = lib.ZMQ_XPUB "[XSUB](https://zeromq.org/socket-api/#xsub-socket) socket." -const XSUB = 10 +const XSUB = lib.ZMQ_XSUB """ [XREQ](https://zeromq.org/socket-api/#dealer-socket) socket. @@ -61,20 +61,20 @@ const UPSTREAM = PULL const DOWNSTREAM = PUSH #Message options -const MORE = 1 +const MORE = lib.ZMQ_MORE const SNDMORE = true #IO Multiplexing -const POLLIN = 1 -const POLLOUT = 2 -const POLLERR = 4 +const POLLIN = lib.ZMQ_POLLIN +const POLLOUT = lib.ZMQ_POLLOUT +const POLLERR = lib.ZMQ_POLLERR #Built in devices -const STREAMER = 1 -const FORWARDER = 2 -const QUEUE = 3 +const STREAMER = lib.ZMQ_STREAMER +const FORWARDER = lib.ZMQ_FORWARDER +const QUEUE = lib.ZMQ_QUEUE #Send/Recv Options -const ZMQ_DONTWAIT = 1 -const ZMQ_SNDMORE = 2 +const ZMQ_DONTWAIT = lib.ZMQ_DONTWAIT +const ZMQ_SNDMORE = lib.ZMQ_SNDMORE diff --git a/src/context.jl b/src/context.jl index 9ba4aae..fd4c0a2 100644 --- a/src/context.jl +++ b/src/context.jl @@ -3,7 +3,7 @@ # the low-level context constructor function _ctx_new() - p = ccall((:zmq_ctx_new, libzmq), Ptr{Cvoid}, ()) + p = lib.zmq_ctx_new() if p == C_NULL throw(StateError(jl_zmq_error_str())) end @@ -73,7 +73,7 @@ function Base.close(ctx::Context) end end empty!(getfield(ctx, :sockets)) - rc = ccall((:zmq_ctx_destroy, libzmq), Cint, (Ptr{Cvoid},), ctx) + rc = lib.zmq_ctx_destroy(ctx) setfield!(ctx, :data, C_NULL) if rc != 0 throw(StateError(jl_zmq_error_str())) @@ -83,14 +83,14 @@ end @deprecate term(ctx::Context) close(ctx) function _get(ctx::Context, option::Integer) - val = ccall((:zmq_ctx_get, libzmq), Cint, (Ptr{Cvoid}, Cint), ctx, option) + val = lib.zmq_ctx_get(ctx, option) if val < 0 throw(StateError(jl_zmq_error_str())) end return val end function _set(ctx::Context, option::Integer, value::Integer) - rc = ccall((:zmq_ctx_set, libzmq), Cint, (Ptr{Cvoid}, Cint, Cint), ctx, option, value) + rc = lib.zmq_ctx_set(ctx, option, value) if rc != 0 throw(StateError(jl_zmq_error_str())) end diff --git a/src/error.jl b/src/error.jl index a9fda9d..2d0ef6d 100644 --- a/src/error.jl +++ b/src/error.jl @@ -8,14 +8,15 @@ end show(io, thiserr::StateError) = print(io, "ZMQ: ", thiserr.msg) # Basic functions -zmq_errno() = ccall((:zmq_errno, libzmq), Cint, ()) + function jl_zmq_error_str() - errno = zmq_errno() - c_strerror = ccall((:zmq_strerror, libzmq), Ptr{UInt8}, (Cint,), errno) + errno = lib.zmq_errno() + c_strerror = lib.zmq_strerror(errno) + if c_strerror != C_NULL strerror = unsafe_string(c_strerror) return strerror else return "Unknown error" end -end \ No newline at end of file +end diff --git a/src/message.jl b/src/message.jl index c6f4060..8e9be3d 100644 --- a/src/message.jl +++ b/src/message.jl @@ -38,7 +38,7 @@ mutable struct Message <: AbstractArray{UInt8,1} function Message() zmsg = new() setfield!(zmsg, :handle, C_NULL) - rc = ccall((:zmq_msg_init, libzmq), Cint, (Ref{Message},), zmsg) + rc = lib.zmq_msg_init(zmsg) if rc != 0 throw(StateError(jl_zmq_error_str())) end @@ -54,7 +54,7 @@ mutable struct Message <: AbstractArray{UInt8,1} function Message(len::Integer) zmsg = new() setfield!(zmsg, :handle, C_NULL) - rc = ccall((:zmq_msg_init_size, libzmq), Cint, (Ref{Message}, Csize_t), zmsg, len) + rc = lib.zmq_msg_init_size(zmsg, len) if rc != 0 throw(StateError(jl_zmq_error_str())) end @@ -74,8 +74,7 @@ mutable struct Message <: AbstractArray{UInt8,1} zmsg = new() setfield!(zmsg, :handle, gc_protect_handle(origin)) gc_free_fn_c = @cfunction(gc_free_fn, Cint, (Ptr{Cvoid}, Ptr{Cvoid})) - rc = ccall((:zmq_msg_init_data, libzmq), Cint, (Ref{Message}, Ptr{T}, Csize_t, Ptr{Cvoid}, Ptr{Cvoid}), - zmsg, m, len, gc_free_fn_c, getfield(zmsg, :handle)) + rc = lib.zmq_msg_init_data(zmsg, m, len, gc_free_fn_c, getfield(zmsg, :handle)) if rc != 0 gc_free_fn(C_NULL, getfield(zmsg, :handle)) # don't leak memory on error throw(StateError(jl_zmq_error_str())) @@ -132,9 +131,9 @@ isfreed(m::Message) = haskey(gc_protect, getfield(m, :handle)) # AbstractArray behaviors: Base.similar(a::Message, ::Type{T}, dims::Dims) where {T} = Array{T}(undef, dims) # ? -Base.length(zmsg::Message) = Int(ccall((:zmq_msg_size, libzmq), Csize_t, (Ref{Message},), zmsg)) +Base.length(zmsg::Message) = Int(lib.zmq_msg_size(zmsg)) Base.size(zmsg::Message) = (length(zmsg),) -Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::Message) = ccall((:zmq_msg_data, libzmq), Ptr{UInt8}, (Ref{Message},), zmsg) +Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::Message) = Ptr{UInt8}(lib.zmq_msg_data(zmsg)) function Base.getindex(a::Message, i::Integer) @boundscheck if i < 1 || i > length(a) throw(BoundsError()) @@ -166,7 +165,7 @@ end # Close a message. You should not need to call this manually (let the # finalizer do it). function Base.close(zmsg::Message) - rc = ccall((:zmq_msg_close, libzmq), Cint, (Ref{Message},), zmsg) + rc = lib.zmq_msg_close(zmsg) if rc != 0 throw(StateError(jl_zmq_error_str())) end @@ -174,14 +173,14 @@ function Base.close(zmsg::Message) end function _get(zmsg::Message, property::Integer) - val = ccall((:zmq_msg_get, libzmq), Cint, (Ref{Message}, Cint), zmsg, property) + val = lib.zmq_msg_get(zmsg, property) if val < 0 throw(StateError(jl_zmq_error_str())) end val end function _set(zmsg::Message, property::Integer, value::Integer) - rc = ccall((:zmq_msg_set, libzmq), Cint, (Ref{Message}, Cint, Cint), zmsg, property, value) + rc = lib.zmq_msg_set(zmsg, property, value) if rc < 0 throw(StateError(jl_zmq_error_str())) end diff --git a/src/socket.jl b/src/socket.jl index 58a9db5..926d5a1 100644 --- a/src/socket.jl +++ b/src/socket.jl @@ -12,7 +12,7 @@ mutable struct Socket Create a socket in a given context. """ function Socket(ctx::Context, typ::Integer) - p = ccall((:zmq_socket, libzmq), Ptr{Cvoid}, (Ptr{Cvoid}, Cint), ctx, typ) + p = lib.zmq_socket(ctx, typ) if p == C_NULL throw(StateError(jl_zmq_error_str())) end @@ -58,7 +58,7 @@ Base.isopen(socket::Socket) = getfield(socket, :data) != C_NULL function Base.close(socket::Socket) if isopen(socket) close(getfield(socket, :pollfd)) - rc = ccall((:zmq_close, libzmq), Cint, (Ptr{Cvoid},), socket) + rc = lib.zmq_close(socket) setfield!(socket, :data, C_NULL) if rc != 0 throw(StateError(jl_zmq_error_str())) @@ -86,7 +86,7 @@ described [here](http://api.zeromq.org/4-3:zmq-bind). e.g. `tcp://127.0.0.1:42000`. """ function Sockets.bind(socket::Socket, endpoint::AbstractString) - rc = ccall((:zmq_bind, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint) + rc = lib.zmq_bind(socket, endpoint) if rc != 0 throw(StateError(jl_zmq_error_str())) end @@ -98,7 +98,7 @@ end Connect the socket to an endpoint. """ function Sockets.connect(socket::Socket, endpoint::AbstractString) - rc=ccall((:zmq_connect, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint) + rc = lib.zmq_connect(socket, endpoint) if rc != 0 throw(StateError(jl_zmq_error_str())) end diff --git a/src/sockopts.jl b/src/sockopts.jl index 67e1b0f..cc49a17 100644 --- a/src/sockopts.jl +++ b/src/sockopts.jl @@ -30,9 +30,7 @@ for (fset, fget, k, T) in [ ] if fset != nothing @eval function $(Symbol("_",fset))(socket::Socket, option_val::Integer) - rc = ccall((:zmq_setsockopt, libzmq), Cint, - (Ptr{Cvoid}, Cint, Ref{$T}, Csize_t), - socket, $k, option_val, sizeof($T)) + rc = lib.zmq_setsockopt(socket, $k, Ref{$T}(option_val), sizeof($T)) if rc != 0 throw(StateError(jl_zmq_error_str())) end @@ -43,9 +41,7 @@ for (fset, fget, k, T) in [ if fget != nothing @eval function $(Symbol("_",fget))(socket::Socket) val = Ref{$T}() - rc = ccall((:zmq_getsockopt, libzmq), Cint, - (Ptr{Cvoid}, Cint, Ref{$T}, Ref{Csize_t}), - socket, $k, val, sizeof($T)) + rc = lib.zmq_getsockopt(socket, $k, val, Ref(Csize_t(sizeof($T)))) if rc != 0 throw(StateError(jl_zmq_error_str())) end @@ -67,9 +63,7 @@ for (f,k) in ((:subscribe,6), (:unsubscribe,7)) f_ = Symbol(f, "_") @eval begin function $f_(socket::Socket, filter::Ptr{T}, len::Integer) where {T} - rc = ccall((:zmq_setsockopt, libzmq), Cint, - (Ptr{Cvoid}, Cint, Ptr{T}, Csize_t), - socket, $k, filter, len) + rc = lib.zmq_setsockopt(socket, $k, filter, len) if rc != 0 throw(StateError(jl_zmq_error_str())) end @@ -92,9 +86,11 @@ for (fset, fget, k) in [ if sizeof(option_val) > 255 throw(StateError("option value too large")) end - rc = ccall((:zmq_setsockopt, libzmq), Cint, - (Ptr{Cvoid}, Cint, Ptr{UInt8}, Csize_t), - socket, $k, option_val, sizeof(option_val)) + + GC.@preserve option_val begin + string_ptr = Base.unsafe_convert(Ptr{UInt8}, option_val) + rc = lib.zmq_setsockopt(socket, $k, string_ptr, sizeof(option_val)) + end if rc != 0 throw(StateError(jl_zmq_error_str())) end @@ -105,9 +101,7 @@ for (fset, fget, k) in [ @eval function ($fget)(socket::Socket) buf = Base.StringVector(255) len = Ref{Csize_t}(sizeof(buf)) - rc = ccall((:zmq_getsockopt, libzmq), Cint, - (Ptr{Cvoid}, Cint, Ptr{UInt8}, Ref{Csize_t}), - socket, $k, buf, len) + rc = lib.zmq_getsockopt(socket, $k, buf, len) if rc != 0 throw(StateError(jl_zmq_error_str())) end