Skip to content

Commit

Permalink
Use FDWatcher instead of _FDWatcher
Browse files Browse the repository at this point in the history
This has a slightly more polished API.
  • Loading branch information
JamesWrigley committed May 15, 2024
1 parent 975376d commit aaa3a68
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module ZMQ
using ZeroMQ_jll

using Base.Libc: EAGAIN
using FileWatching: UV_READABLE, uv_pollcb, _FDWatcher
using FileWatching: UV_READABLE, uv_pollcb, FDWatcher
import Sockets
using Sockets: connect, bind, send, recv
import Base.GC: @preserve
Expand Down
4 changes: 2 additions & 2 deletions src/comm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function _send(socket::Socket, zmsg, more::Bool=false)
wait(socket)
end
else
notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq)
notify_is_expensive = !isempty(getfield(socket,:pollfd).watcher.notify.waitq)
if notify_is_expensive
socket.events != 0 && notify(socket)
end
Expand Down Expand Up @@ -80,7 +80,7 @@ function _recv!(socket::Socket, zmsg)
wait(socket)
end
else
notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq)
notify_is_expensive = !isempty(getfield(socket,:pollfd).watcher.notify.waitq)
if notify_is_expensive
socket.events != 0 && notify(socket)
end
Expand Down
10 changes: 5 additions & 5 deletions src/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Do-block constructor.
"""
mutable struct Socket
data::Ptr{Cvoid}
pollfd::_FDWatcher
pollfd::FDWatcher

"""
Socket(ctx::Context, typ::Integer)
Expand All @@ -32,7 +32,7 @@ mutable struct Socket
throw(StateError(jl_zmq_error_str()))
end
socket = new(p)
setfield!(socket, :pollfd, _FDWatcher(fd(socket), #=readable=#true, #=writable=#false))
setfield!(socket, :pollfd, FDWatcher(fd(socket), #=readable=#true, #=writable=#false))
finalizer(close, socket)
push!(getfield(ctx, :sockets), WeakRef(socket))
return socket
Expand Down Expand Up @@ -72,7 +72,7 @@ Base.isopen(socket::Socket) = getfield(socket, :data) != C_NULL
"""
function Base.close(socket::Socket)
if isopen(socket)
close(getfield(socket, :pollfd), #=readable=#true, #=writable=#false)
close(getfield(socket, :pollfd))
rc = ccall((:zmq_close, libzmq), Cint, (Ptr{Cvoid},), socket)
setfield!(socket, :data, C_NULL)
if rc != 0
Expand All @@ -90,8 +90,8 @@ if Sys.iswindows()
Base.fd(socket::Socket) = WindowsRawSocket(convert(Ptr{Cvoid}, socket.fd))
end

Base.wait(socket::Socket) = wait(getfield(socket, :pollfd), readable=true, writable=false)
Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).handle, Int32(0), Int32(UV_READABLE))
Base.wait(socket::Socket) = wait(getfield(socket, :pollfd))
Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).watcher.handle, Int32(0), Int32(UV_READABLE))

"""
Sockets.bind(socket::Socket, endpoint::AbstractString)
Expand Down

0 comments on commit aaa3a68

Please sign in to comment.