diff --git a/src/ZMQ.jl b/src/ZMQ.jl index e7e777e..91d345d 100644 --- a/src/ZMQ.jl +++ b/src/ZMQ.jl @@ -5,7 +5,7 @@ module ZMQ using BinDeps @BinDeps.load_dependencies -import Base: convert, ref, get, bytestring, length, size, stride, similar, getindex, setindex! +import Base: convert, ref, get, bytestring, length, size, stride, similar, getindex, setindex!, fd, wait export #Types @@ -136,10 +136,10 @@ end # end v3only # Getting and setting socket options # Socket options of integer type -let u64p = zeros(Uint64, 1), i64p = zeros(Int64, 1), ip = zeros(Cint, 1), u32p = zeros(Uint32, 1), sz = zeros(Uint, 1) +let u64p = zeros(Uint64, 1), i64p = zeros(Int64, 1), ip = zeros(Cint, 1), u32p = zeros(Uint32, 1), sz = zeros(Uint, 1), + pp = zeros(Ptr{Void},1) opslist = { (:set_affinity, :get_affinity, 4, u64p) - (nothing, :get_fd, 14, ip) (:set_type, :get_type, 16, ip) (:set_linger, :get_linger, 17, ip) (:set_reconnect_ivl, :get_reconnect_ivl, 18, ip) @@ -147,6 +147,9 @@ opslist = { (:set_reconnect_ivl_max, :get_reconnect_ivl_max, 21, ip) } +@unix_only opslist = vcat(opslist, (nothing, :get_fd, 14, ip)) +@windows_only opslist = vcat(opslist, (nothing, :get_fd, 14, pp)) + if version.major == 2 opslist = vcat(opslist, { (:set_hwm, :get_hwm, 1, u64p) @@ -228,6 +231,12 @@ get_rcvmore(socket::Socket) = bool(_zmq_getsockopt_rcvmore(socket)) ismore(socket::Socket) = get_rcvmore(socket) +# Raw FD access +@unix_only fd(socket::Socket) = RawFD(get_fd(socket)) +@windows_only fd(socket::Socket) = WindowsRawSocket(get_fd(socket)) +wait(socket::Socket; readable=false, writeable=false) = wait(fd(socket); readable=readable, writeable=writeable) + + # Socket options of string type let u8ap = zeros(Uint8, 255), sz = zeros(Uint, 1) opslist = { @@ -412,6 +421,9 @@ end # end v2only @v3only begin function send(socket::Socket, zmsg::Message, flag=int32(0)) + if (get_events(socket) & POLLOUT) == 0 + wait(socket; writeable = true) + end rc = ccall((:zmq_msg_send, zmq), Cint, (Ptr{Void}, Ptr{Message}, Cint), &zmsg, socket.data, flag) if rc == -1 @@ -419,6 +431,9 @@ function send(socket::Socket, zmsg::Message, flag=int32(0)) end end function send{T}(socket::Socket, msg::Ptr{T}, len, flag=int32(0)) + if (get_events(socket) & POLLOUT) == 0 + wait(socket; writeable = true) + end rc = ccall((:zmq_send, zmq), Cint, (Ptr{Void}, Ptr{T}, Csize_t, Cint), socket.data, msg, len * sizeof(T), flag) @@ -433,6 +448,9 @@ end # end v3only @v2only begin function recv(socket::Socket, flag=int32(0)) zmsg = Message() + if (get_events(socket) & POLLIN) == 0 + wait(socket; readable = true) + end rc = ccall((:zmq_recv, zmq), Cint, (Ptr{Void}, Ptr{Message}, Cint), socket.data, &zmsg, flag) if rc != 0 @@ -445,6 +463,9 @@ end # end v2only @v3only begin function recv(socket::Socket, flag=int32(0)) zmsg = Message() + if (get_events(socket) & POLLIN) == 0 + wait(socket; readable = true) + end rc = ccall((:zmq_msg_recv, zmq), Cint, (Ptr{Message}, Ptr{Void}, Cint), &zmsg, socket.data, flag) if rc == -1 diff --git a/test/ZMQ.jl b/test/ZMQ.jl index 1b178cd..22a8136 100644 --- a/test/ZMQ.jl +++ b/test/ZMQ.jl @@ -62,6 +62,25 @@ ZMQ.send(s2, Message("test request")) ZMQ.send(s1, Message("test response")) @assert (bytestring(ZMQ.recv(s2)) == "test response") +# Test task-blocking behavior +c = Base.Condition() +msg_sent = false +@async begin + global msg_sent + sleep(0.5) + msg_sent = true + ZMQ.send(s2, Message("test request")) + @assert (bytestring(ZMQ.recv(s2)) == "test response") + notify(c) +end + +# This will hang forver if ZMQ blocks the entire process since +# we'll never switch to the other task +@assert (bytestring(ZMQ.recv(s1)) == "test request") +@assert msg_sent == true +ZMQ.send(s1, Message("test response")) +wait(c) + ZMQ.send(s2, Message("another test request")) msg = ZMQ.recv(s1) o=convert(IOStream, msg)