Skip to content

Commit

Permalink
ZMQ event loop integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Keno committed Jul 23, 2013
1 parent b66dd93 commit 035e73a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
27 changes: 24 additions & 3 deletions src/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module ZMQ
using BinDeps
@BinDeps.load_dependencies

import Base: convert, ref, get, bytestring, length, size, stride, similar, getindex, setindex!
import Base: convert, ref, get, bytestring, length, size, stride, similar, getindex, setindex!, fd, wait

export
#Types
Expand Down Expand Up @@ -136,17 +136,20 @@ end # end v3only

# Getting and setting socket options
# Socket options of integer type
let u64p = zeros(Uint64, 1), i64p = zeros(Int64, 1), ip = zeros(Cint, 1), u32p = zeros(Uint32, 1), sz = zeros(Uint, 1)
let u64p = zeros(Uint64, 1), i64p = zeros(Int64, 1), ip = zeros(Cint, 1), u32p = zeros(Uint32, 1), sz = zeros(Uint, 1),
pp = zeros(Ptr{Void},1)
opslist = {
(:set_affinity, :get_affinity, 4, u64p)
(nothing, :get_fd, 14, ip)
(:set_type, :get_type, 16, ip)
(:set_linger, :get_linger, 17, ip)
(:set_reconnect_ivl, :get_reconnect_ivl, 18, ip)
(:set_backlog, :get_backlog, 19, ip)
(:set_reconnect_ivl_max, :get_reconnect_ivl_max, 21, ip)
}

@unix_only opslist = vcat(opslist, (nothing, :get_fd, 14, ip))
@windows_only opslist = vcat(opslist, (nothing, :get_fd, 14, pp))

if version.major == 2
opslist = vcat(opslist, {
(:set_hwm, :get_hwm, 1, u64p)
Expand Down Expand Up @@ -228,6 +231,12 @@ get_rcvmore(socket::Socket) = bool(_zmq_getsockopt_rcvmore(socket))
ismore(socket::Socket) = get_rcvmore(socket)


# Raw FD access
@unix_only fd(socket::Socket) = RawFD(get_fd(socket))
@windows_only fd(socket::Socket) = WindowsRawSocket(get_fd(socket))
wait(socket::Socket; readable=false, writeable=false) = wait(fd(socket); readable=readable, writeable=writeable)


# Socket options of string type
let u8ap = zeros(Uint8, 255), sz = zeros(Uint, 1)
opslist = {
Expand Down Expand Up @@ -412,13 +421,19 @@ end # end v2only

@v3only begin
function send(socket::Socket, zmsg::Message, flag=int32(0))
if (get_events(socket) & POLLOUT) == 0
wait(socket; writeable = true)
end
rc = ccall((:zmq_msg_send, zmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
&zmsg, socket.data, flag)
if rc == -1
throw(StateError(jl_zmq_error_str()))
end
end
function send{T}(socket::Socket, msg::Ptr{T}, len, flag=int32(0))
if (get_events(socket) & POLLOUT) == 0
wait(socket; writeable = true)
end
rc = ccall((:zmq_send, zmq), Cint,
(Ptr{Void}, Ptr{T}, Csize_t, Cint),
socket.data, msg, len * sizeof(T), flag)
Expand All @@ -433,6 +448,9 @@ end # end v3only
@v2only begin
function recv(socket::Socket, flag=int32(0))
zmsg = Message()
if (get_events(socket) & POLLIN) == 0
wait(socket; readable = true)
end
rc = ccall((:zmq_recv, zmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
socket.data, &zmsg, flag)
if rc != 0
Expand All @@ -445,6 +463,9 @@ end # end v2only
@v3only begin
function recv(socket::Socket, flag=int32(0))
zmsg = Message()
if (get_events(socket) & POLLIN) == 0
wait(socket; readable = true)
end
rc = ccall((:zmq_msg_recv, zmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
&zmsg, socket.data, flag)
if rc == -1
Expand Down
19 changes: 19 additions & 0 deletions test/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ ZMQ.send(s2, Message("test request"))
ZMQ.send(s1, Message("test response"))
@assert (bytestring(ZMQ.recv(s2)) == "test response")

# Test task-blocking behavior
c = Base.Condition()
msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, Message("test request"))
@assert (bytestring(ZMQ.recv(s2)) == "test response")
notify(c)
end

# This will hang forver if ZMQ blocks the entire process since
# we'll never switch to the other task
@assert (bytestring(ZMQ.recv(s1)) == "test request")
@assert msg_sent == true
ZMQ.send(s1, Message("test response"))
wait(c)

ZMQ.send(s2, Message("another test request"))
msg = ZMQ.recv(s1)
o=convert(IOStream, msg)
Expand Down

0 comments on commit 035e73a

Please sign in to comment.