diff --git a/src/ZMQ.jl b/src/ZMQ.jl index 348a2a2..48f9aae 100644 --- a/src/ZMQ.jl +++ b/src/ZMQ.jl @@ -4,7 +4,7 @@ module ZMQ using ZeroMQ_jll using Base.Libc: EAGAIN -using FileWatching: UV_READABLE, uv_pollcb, _FDWatcher +using FileWatching: UV_READABLE, uv_pollcb, FDWatcher import Sockets using Sockets: connect, bind, send, recv import Base.GC: @preserve diff --git a/src/comm.jl b/src/comm.jl index 8d00128..00529e5 100644 --- a/src/comm.jl +++ b/src/comm.jl @@ -16,7 +16,7 @@ function _send(socket::Socket, zmsg, more::Bool=false) wait(socket) end else - notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq) + notify_is_expensive = !isempty(getfield(socket,:pollfd).watcher.notify.waitq) if notify_is_expensive socket.events != 0 && notify(socket) end @@ -80,7 +80,7 @@ function _recv!(socket::Socket, zmsg) wait(socket) end else - notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq) + notify_is_expensive = !isempty(getfield(socket,:pollfd).watcher.notify.waitq) if notify_is_expensive socket.events != 0 && notify(socket) end diff --git a/src/socket.jl b/src/socket.jl index e1fe96c..4680b16 100644 --- a/src/socket.jl +++ b/src/socket.jl @@ -19,7 +19,7 @@ Do-block constructor. """ mutable struct Socket data::Ptr{Cvoid} - pollfd::_FDWatcher + pollfd::FDWatcher """ Socket(ctx::Context, typ::Integer) @@ -32,7 +32,7 @@ mutable struct Socket throw(StateError(jl_zmq_error_str())) end socket = new(p) - setfield!(socket, :pollfd, _FDWatcher(fd(socket), #=readable=#true, #=writable=#false)) + setfield!(socket, :pollfd, FDWatcher(fd(socket), #=readable=#true, #=writable=#false)) finalizer(close, socket) push!(getfield(ctx, :sockets), WeakRef(socket)) return socket @@ -72,7 +72,7 @@ Base.isopen(socket::Socket) = getfield(socket, :data) != C_NULL """ function Base.close(socket::Socket) if isopen(socket) - close(getfield(socket, :pollfd), #=readable=#true, #=writable=#false) + close(getfield(socket, :pollfd)) rc = ccall((:zmq_close, libzmq), Cint, (Ptr{Cvoid},), socket) setfield!(socket, :data, C_NULL) if rc != 0 @@ -90,8 +90,8 @@ if Sys.iswindows() Base.fd(socket::Socket) = WindowsRawSocket(convert(Ptr{Cvoid}, socket.fd)) end -Base.wait(socket::Socket) = wait(getfield(socket, :pollfd), readable=true, writable=false) -Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).handle, Int32(0), Int32(UV_READABLE)) +Base.wait(socket::Socket) = wait(getfield(socket, :pollfd)) +Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).watcher.handle, Int32(0), Int32(UV_READABLE)) """ Sockets.bind(socket::Socket, endpoint::AbstractString)