Skip to content

Commit

Permalink
Replace explicit ccall's with calls to the generated bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesWrigley committed May 31, 2024
1 parent c3ed094 commit f1be2c0
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 83 deletions.
6 changes: 3 additions & 3 deletions src/ZMQ.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Support for ZeroMQ, a network and interprocess communication library

module ZMQ
import ZeroMQ_jll: libzmq

using Base.Libc: EAGAIN
using FileWatching: UV_READABLE, uv_pollcb, FDWatcher
Expand All @@ -19,21 +18,22 @@ export
#Sockets
connect, bind, send, recv


include("bindings.jl")
include("constants.jl")
include("optutil.jl")
include("error.jl")
include("context.jl")
include("socket.jl")
include("sockopts.jl")
include("message.jl")
include("msg_bindings.jl")
include("comm.jl")

function __init__()
major = Ref{Cint}()
minor = Ref{Cint}()
patch = Ref{Cint}()
ccall((:zmq_version, libzmq), Cvoid, (Ptr{Cint}, Ptr{Cint}, Ptr{Cint}), major, minor, patch)
lib.zmq_version(major, minor, patch)
global version = VersionNumber(major[], minor[], patch[])
if version < v"3"
error("ZMQ version $version < 3 is not supported")
Expand Down
14 changes: 7 additions & 7 deletions src/_message.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,34 @@

# Low-level message type, matching the declaration of
# zmq_msg_t in the header: char _[64];
primitive type _Message 64 * 8 end
const _Message = lib.zmq_msg_t

const _MessageOrRef = Union{_Message,Base.RefValue{_Message}}

function msg_init()
zmsg = Ref{_Message}()
rc = ccall((:zmq_msg_init, libzmq), Cint, (Ref{_Message},), zmsg)
rc = lib.zmq_msg_init(zmsg)
rc != 0 && throw(StateError(jl_zmq_error_str()))
return zmsg
end

function msg_init(nbytes::Int)
zmsg = Ref{_Message}()
rc = ccall((:zmq_msg_init_size, libzmq), Cint, (Ref{_Message}, Csize_t), zmsg, nbytes % Csize_t)
rc = lib.zmq_msg_init_size(zmsg, nbytes % Csize_t)
rc != 0 && throw(StateError(jl_zmq_error_str()))
return zmsg
end

# note: no finalizer for _Message, so we need to call close manually!
function Base.close(zmsg::_MessageOrRef)
rc = ccall((:zmq_msg_close, libzmq), Cint, (Ref{_Message},), zmsg)
rc = lib.zmq_msg_close(zmsg)
rc != 0 && throw(StateError(jl_zmq_error_str()))
return nothing
end

Base.length(zmsg::_MessageOrRef) = ccall((:zmq_msg_size, libzmq), Csize_t, (Ref{_Message},), zmsg) % Int
Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::_MessageOrRef) =
ccall((:zmq_msg_data, libzmq), Ptr{UInt8}, (Ref{_Message},), zmsg)
Base.length(zmsg::_MessageOrRef) = lib.zmq_msg_size(zmsg) % Int
Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::_Message) = Ptr{UInt8}(lib.zmq_msg_data(Ref(zmsg)))
Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::Base.RefValue{_Message}) = Ptr{UInt8}(lib.zmq_msg_data(zmsg))

# isbits data, vectors thereof, and strings can be converted to/from _Message

Expand Down
18 changes: 4 additions & 14 deletions src/comm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@

############################################################################

msg_send(socket::Socket, zmsg::_MessageOrRef, flags::Integer) =
ccall((:zmq_msg_send, libzmq), Cint, (Ref{_Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags)
msg_send(socket::Socket, zmsg::Message, flags::Integer) =
ccall((:zmq_msg_send, libzmq), Cint, (Ref{Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags)

function _send(socket::Socket, zmsg, more::Bool=false)
while true
if -1 == msg_send(socket, zmsg, (ZMQ_SNDMORE*more) | ZMQ_DONTWAIT)
zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str()))
if -1 == lib.zmq_msg_send(zmsg, socket, (ZMQ_SNDMORE*more) | ZMQ_DONTWAIT)
lib.zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str()))
while (socket.events & POLLOUT) == 0
wait(socket)
end
Expand Down Expand Up @@ -67,15 +62,10 @@ end

############################################################################

msg_recv(socket::Socket, zmsg::_MessageOrRef, flags::Integer) =
ccall((:zmq_msg_recv, libzmq), Cint, (Ref{_Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags)
msg_recv(socket::Socket, zmsg::Message, flags::Integer) =
ccall((:zmq_msg_recv, libzmq), Cint, (Ref{Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags)

function _recv!(socket::Socket, zmsg)
while true
if -1 == msg_recv(socket, zmsg, ZMQ_DONTWAIT)
zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str()))
if -1 == lib.zmq_msg_recv(zmsg, socket, ZMQ_DONTWAIT)
lib.zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str()))
while socket.events & POLLIN== 0
wait(socket)
end
Expand Down
46 changes: 23 additions & 23 deletions src/constants.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,32 @@
## Constants

# Context options
const IO_THREADS = 1
const MAX_SOCKETS = 2
const IPV6 = 42
const IO_THREADS = lib.ZMQ_IO_THREADS
const MAX_SOCKETS = lib.ZMQ_MAX_SOCKETS
const IPV6 = lib.ZMQ_IPV6

"[PAIR](https://zeromq.org/socket-api/#pair-socket) socket."
const PAIR = 0
const PAIR = lib.ZMQ_PAIR
"[PUB](https://zeromq.org/socket-api/#pub-socket) socket."
const PUB = 1
const PUB = lib.ZMQ_PUB
"[SUB](https://zeromq.org/socket-api/#sub-socket) socket."
const SUB = 2
const SUB = lib.ZMQ_SUB
"[REQ](https://zeromq.org/socket-api/#req-socket) socket."
const REQ = 3
const REQ = lib.ZMQ_REQ
"[REP](https://zeromq.org/socket-api/#rep-socket) socket."
const REP = 4
const REP = lib.ZMQ_REP
"[DEALER](https://zeromq.org/socket-api/#dealer-socket) socket."
const DEALER = 5
const DEALER = lib.ZMQ_DEALER
"[ROUTER](https://zeromq.org/socket-api/#router-socket) socket."
const ROUTER = 6
const ROUTER = lib.ZMQ_ROUTER
"[PULL](https://zeromq.org/socket-api/#pull-socket) socket."
const PULL = 7
const PULL = lib.ZMQ_PULL
"[PUSH](https://zeromq.org/socket-api/#push-socket) socket."
const PUSH = 8
const PUSH = lib.ZMQ_PUSH
"[XPUB](https://zeromq.org/socket-api/#xpub-socket) socket."
const XPUB = 9
const XPUB = lib.ZMQ_XPUB
"[XSUB](https://zeromq.org/socket-api/#xsub-socket) socket."
const XSUB = 10
const XSUB = lib.ZMQ_XSUB
"""
[XREQ](https://zeromq.org/socket-api/#dealer-socket) socket.
Expand Down Expand Up @@ -61,20 +61,20 @@ const UPSTREAM = PULL
const DOWNSTREAM = PUSH

#Message options
const MORE = 1
const MORE = lib.ZMQ_MORE
const SNDMORE = true

#IO Multiplexing
const POLLIN = 1
const POLLOUT = 2
const POLLERR = 4
const POLLIN = lib.ZMQ_POLLIN
const POLLOUT = lib.ZMQ_POLLOUT
const POLLERR = lib.ZMQ_POLLERR

#Built in devices
const STREAMER = 1
const FORWARDER = 2
const QUEUE = 3
const STREAMER = lib.ZMQ_STREAMER
const FORWARDER = lib.ZMQ_FORWARDER
const QUEUE = lib.ZMQ_QUEUE


#Send/Recv Options
const ZMQ_DONTWAIT = 1
const ZMQ_SNDMORE = 2
const ZMQ_DONTWAIT = lib.ZMQ_DONTWAIT
const ZMQ_SNDMORE = lib.ZMQ_SNDMORE
8 changes: 4 additions & 4 deletions src/context.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# the low-level context constructor
function _ctx_new()
p = ccall((:zmq_ctx_new, libzmq), Ptr{Cvoid}, ())
p = lib.zmq_ctx_new()
if p == C_NULL
throw(StateError(jl_zmq_error_str()))
end
Expand Down Expand Up @@ -73,7 +73,7 @@ function Base.close(ctx::Context)
end
end
empty!(getfield(ctx, :sockets))
rc = ccall((:zmq_ctx_destroy, libzmq), Cint, (Ptr{Cvoid},), ctx)
rc = lib.zmq_ctx_destroy(ctx)
setfield!(ctx, :data, C_NULL)
if rc != 0
throw(StateError(jl_zmq_error_str()))
Expand All @@ -83,14 +83,14 @@ end
@deprecate term(ctx::Context) close(ctx)

function _get(ctx::Context, option::Integer)
val = ccall((:zmq_ctx_get, libzmq), Cint, (Ptr{Cvoid}, Cint), ctx, option)
val = lib.zmq_ctx_get(ctx, option)
if val < 0
throw(StateError(jl_zmq_error_str()))
end
return val
end
function _set(ctx::Context, option::Integer, value::Integer)
rc = ccall((:zmq_ctx_set, libzmq), Cint, (Ptr{Cvoid}, Cint, Cint), ctx, option, value)
rc = lib.zmq_ctx_set(ctx, option, value)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand Down
9 changes: 5 additions & 4 deletions src/error.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ end
show(io, thiserr::StateError) = print(io, "ZMQ: ", thiserr.msg)

# Basic functions
zmq_errno() = ccall((:zmq_errno, libzmq), Cint, ())

function jl_zmq_error_str()
errno = zmq_errno()
c_strerror = ccall((:zmq_strerror, libzmq), Ptr{UInt8}, (Cint,), errno)
errno = lib.zmq_errno()
c_strerror = lib.zmq_strerror(errno)

if c_strerror != C_NULL
strerror = unsafe_string(c_strerror)
return strerror
else
return "Unknown error"
end
end
end
17 changes: 8 additions & 9 deletions src/message.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mutable struct Message <: AbstractArray{UInt8,1}
function Message()
zmsg = new()
setfield!(zmsg, :handle, C_NULL)
rc = ccall((:zmq_msg_init, libzmq), Cint, (Ref{Message},), zmsg)
rc = lib.zmq_msg_init(zmsg)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -54,7 +54,7 @@ mutable struct Message <: AbstractArray{UInt8,1}
function Message(len::Integer)
zmsg = new()
setfield!(zmsg, :handle, C_NULL)
rc = ccall((:zmq_msg_init_size, libzmq), Cint, (Ref{Message}, Csize_t), zmsg, len)
rc = lib.zmq_msg_init_size(zmsg, len)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -74,8 +74,7 @@ mutable struct Message <: AbstractArray{UInt8,1}
zmsg = new()
setfield!(zmsg, :handle, gc_protect_handle(origin))
gc_free_fn_c = @cfunction(gc_free_fn, Cint, (Ptr{Cvoid}, Ptr{Cvoid}))
rc = ccall((:zmq_msg_init_data, libzmq), Cint, (Ref{Message}, Ptr{T}, Csize_t, Ptr{Cvoid}, Ptr{Cvoid}),
zmsg, m, len, gc_free_fn_c, getfield(zmsg, :handle))
rc = lib.zmq_msg_init_data(zmsg, m, len, gc_free_fn_c, getfield(zmsg, :handle))
if rc != 0
gc_free_fn(C_NULL, getfield(zmsg, :handle)) # don't leak memory on error
throw(StateError(jl_zmq_error_str()))
Expand Down Expand Up @@ -132,9 +131,9 @@ isfreed(m::Message) = haskey(gc_protect, getfield(m, :handle))

# AbstractArray behaviors:
Base.similar(a::Message, ::Type{T}, dims::Dims) where {T} = Array{T}(undef, dims) # ?
Base.length(zmsg::Message) = Int(ccall((:zmq_msg_size, libzmq), Csize_t, (Ref{Message},), zmsg))
Base.length(zmsg::Message) = Int(lib.zmq_msg_size(zmsg))
Base.size(zmsg::Message) = (length(zmsg),)
Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::Message) = ccall((:zmq_msg_data, libzmq), Ptr{UInt8}, (Ref{Message},), zmsg)
Base.unsafe_convert(::Type{Ptr{UInt8}}, zmsg::Message) = Ptr{UInt8}(lib.zmq_msg_data(zmsg))
function Base.getindex(a::Message, i::Integer)
@boundscheck if i < 1 || i > length(a)
throw(BoundsError())
Expand Down Expand Up @@ -166,22 +165,22 @@ end
# Close a message. You should not need to call this manually (let the
# finalizer do it).
function Base.close(zmsg::Message)
rc = ccall((:zmq_msg_close, libzmq), Cint, (Ref{Message},), zmsg)
rc = lib.zmq_msg_close(zmsg)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
return nothing
end

function _get(zmsg::Message, property::Integer)
val = ccall((:zmq_msg_get, libzmq), Cint, (Ref{Message}, Cint), zmsg, property)
val = lib.zmq_msg_get(zmsg, property)
if val < 0
throw(StateError(jl_zmq_error_str()))
end
val
end
function _set(zmsg::Message, property::Integer, value::Integer)
rc = ccall((:zmq_msg_set, libzmq), Cint, (Ref{Message}, Cint, Cint), zmsg, property, value)
rc = lib.zmq_msg_set(zmsg, property, value)
if rc < 0
throw(StateError(jl_zmq_error_str()))
end
Expand Down
8 changes: 4 additions & 4 deletions src/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mutable struct Socket
Create a socket in a given context.
"""
function Socket(ctx::Context, typ::Integer)
p = ccall((:zmq_socket, libzmq), Ptr{Cvoid}, (Ptr{Cvoid}, Cint), ctx, typ)
p = lib.zmq_socket(ctx, typ)
if p == C_NULL
throw(StateError(jl_zmq_error_str()))
end
Expand Down Expand Up @@ -58,7 +58,7 @@ Base.isopen(socket::Socket) = getfield(socket, :data) != C_NULL
function Base.close(socket::Socket)
if isopen(socket)
close(getfield(socket, :pollfd))
rc = ccall((:zmq_close, libzmq), Cint, (Ptr{Cvoid},), socket)
rc = lib.zmq_close(socket)
setfield!(socket, :data, C_NULL)
if rc != 0
throw(StateError(jl_zmq_error_str()))
Expand Down Expand Up @@ -86,7 +86,7 @@ described
[here](http://api.zeromq.org/4-3:zmq-bind). e.g. `tcp://127.0.0.1:42000`.
"""
function Sockets.bind(socket::Socket, endpoint::AbstractString)
rc = ccall((:zmq_bind, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint)
rc = lib.zmq_bind(socket, endpoint)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -98,7 +98,7 @@ end
Connect the socket to an endpoint.
"""
function Sockets.connect(socket::Socket, endpoint::AbstractString)
rc=ccall((:zmq_connect, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint)
rc = lib.zmq_connect(socket, endpoint)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand Down
Loading

0 comments on commit f1be2c0

Please sign in to comment.