Skip to content

Commit

Permalink
Use FDWatcher instead of _FDWatcher
Browse files Browse the repository at this point in the history
This has a slightly more polished API.
  • Loading branch information
JamesWrigley committed May 28, 2022
1 parent 044d957 commit 6fe5c70
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module ZMQ
using ZeroMQ_jll

using Base.Libc: EAGAIN
using FileWatching: UV_READABLE, uv_pollcb, _FDWatcher
using FileWatching: UV_READABLE, uv_pollcb, FDWatcher
import Sockets
using Sockets: connect, bind, send, recv
import Base.GC: @preserve
Expand Down
4 changes: 2 additions & 2 deletions src/comm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function _send(socket::Socket, zmsg, more::Bool=false)
wait(socket)
end
else
notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq)
notify_is_expensive = !isempty(getfield(socket,:pollfd).watcher.notify.waitq)
if notify_is_expensive
socket.events != 0 && notify(socket)
end
Expand Down Expand Up @@ -75,7 +75,7 @@ function _recv!(socket::Socket, zmsg)
wait(socket)
end
else
notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq)
notify_is_expensive = !isempty(getfield(socket,:pollfd).watcher.notify.waitq)
if notify_is_expensive
socket.events != 0 && notify(socket)
end
Expand Down
10 changes: 5 additions & 5 deletions src/socket.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
## Sockets ##
mutable struct Socket
data::Ptr{Cvoid}
pollfd::_FDWatcher
pollfd::FDWatcher

function Socket(ctx::Context, typ::Integer)
p = ccall((:zmq_socket, libzmq), Ptr{Cvoid}, (Ptr{Cvoid}, Cint), ctx, typ)
if p == C_NULL
throw(StateError(jl_zmq_error_str()))
end
socket = new(p)
setfield!(socket, :pollfd, _FDWatcher(fd(socket), #=readable=#true, #=writable=#false))
setfield!(socket, :pollfd, FDWatcher(fd(socket), #=readable=#true, #=writable=#false))
finalizer(close, socket)
push!(getfield(ctx, :sockets), WeakRef(socket))
return socket
Expand All @@ -31,7 +31,7 @@ Base.unsafe_convert(::Type{Ptr{Cvoid}}, s::Socket) = getfield(s, :data)
Base.isopen(socket::Socket) = getfield(socket, :data) != C_NULL
function Base.close(socket::Socket)
if isopen(socket)
close(getfield(socket, :pollfd), #=readable=#true, #=writable=#false)
close(getfield(socket, :pollfd))
rc = ccall((:zmq_close, libzmq), Cint, (Ptr{Cvoid},), socket)
setfield!(socket, :data, C_NULL)
if rc != 0
Expand All @@ -49,8 +49,8 @@ if Sys.iswindows()
Base.fd(socket::Socket) = WindowsRawSocket(convert(Ptr{Cvoid}, socket.fd))
end

Base.wait(socket::Socket) = wait(getfield(socket, :pollfd), readable=true, writable=false)
Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).handle, Int32(0), Int32(UV_READABLE))
Base.wait(socket::Socket) = wait(getfield(socket, :pollfd))
Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).watcher.handle, Int32(0), Int32(UV_READABLE))

function Sockets.bind(socket::Socket, endpoint::AbstractString)
rc = ccall((:zmq_bind, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint)
Expand Down

0 comments on commit 6fe5c70

Please sign in to comment.