diff --git a/src/ZMQ.jl b/src/ZMQ.jl index f57be26..6ef642e 100644 --- a/src/ZMQ.jl +++ b/src/ZMQ.jl @@ -4,7 +4,7 @@ module ZMQ using ZeroMQ_jll using Base.Libc: EAGAIN -using FileWatching: UV_READABLE, uv_pollcb, _FDWatcher +using FileWatching: UV_READABLE, uv_pollcb, FDWatcher import Sockets using Sockets: connect, bind, send, recv import Base.GC: @preserve diff --git a/src/comm.jl b/src/comm.jl index 8c8fd25..a8b6b16 100644 --- a/src/comm.jl +++ b/src/comm.jl @@ -16,7 +16,7 @@ function _send(socket::Socket, zmsg, more::Bool=false) wait(socket) end else - notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq) + notify_is_expensive = !isempty(getfield(socket,:pollfd).watcher.notify.waitq) if notify_is_expensive socket.events != 0 && notify(socket) end @@ -75,7 +75,7 @@ function _recv!(socket::Socket, zmsg) wait(socket) end else - notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq) + notify_is_expensive = !isempty(getfield(socket,:pollfd).watcher.notify.waitq) if notify_is_expensive socket.events != 0 && notify(socket) end diff --git a/src/socket.jl b/src/socket.jl index 2757d75..cc61457 100644 --- a/src/socket.jl +++ b/src/socket.jl @@ -1,7 +1,7 @@ ## Sockets ## mutable struct Socket data::Ptr{Cvoid} - pollfd::_FDWatcher + pollfd::FDWatcher function Socket(ctx::Context, typ::Integer) p = ccall((:zmq_socket, libzmq), Ptr{Cvoid}, (Ptr{Cvoid}, Cint), ctx, typ) @@ -9,7 +9,7 @@ mutable struct Socket throw(StateError(jl_zmq_error_str())) end socket = new(p) - setfield!(socket, :pollfd, _FDWatcher(fd(socket), #=readable=#true, #=writable=#false)) + setfield!(socket, :pollfd, FDWatcher(fd(socket), #=readable=#true, #=writable=#false)) finalizer(close, socket) push!(getfield(ctx, :sockets), WeakRef(socket)) return socket @@ -31,7 +31,7 @@ Base.unsafe_convert(::Type{Ptr{Cvoid}}, s::Socket) = getfield(s, :data) Base.isopen(socket::Socket) = getfield(socket, :data) != C_NULL function Base.close(socket::Socket) if isopen(socket) - close(getfield(socket, :pollfd), #=readable=#true, #=writable=#false) + close(getfield(socket, :pollfd)) rc = ccall((:zmq_close, libzmq), Cint, (Ptr{Cvoid},), socket) setfield!(socket, :data, C_NULL) if rc != 0 @@ -49,8 +49,8 @@ if Sys.iswindows() Base.fd(socket::Socket) = WindowsRawSocket(convert(Ptr{Cvoid}, socket.fd)) end -Base.wait(socket::Socket) = wait(getfield(socket, :pollfd), readable=true, writable=false) -Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).handle, Int32(0), Int32(UV_READABLE)) +Base.wait(socket::Socket) = wait(getfield(socket, :pollfd)) +Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).watcher.handle, Int32(0), Int32(UV_READABLE)) function Sockets.bind(socket::Socket, endpoint::AbstractString) rc = ccall((:zmq_bind, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint)