Skip to content

Commit

Permalink
Merge pull request #75 from JuliaLang/sjk/fix
Browse files Browse the repository at this point in the history
Fix for 0.4
  • Loading branch information
simonster committed Mar 20, 2015
2 parents 42104f1 + 6459822 commit 02e4afb
Showing 1 changed file with 36 additions and 26 deletions.
62 changes: 36 additions & 26 deletions src/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

module ZMQ
using Compat
if VERSION >= v"0.4.0-dev+3710"
import Base.unsafe_convert
else
const unsafe_convert = Base.convert
end
if VERSION >= v"0.4.0-dev+3844"
using Base.Libdl, Base.Libc
else
using Base: EAGAIN
end

if isfile(joinpath(dirname(@__FILE__),"..","deps","deps.jl"))
include("../deps/deps.jl")
Expand Down Expand Up @@ -44,7 +54,7 @@ show(io, thiserr::StateError) = print(io, "ZMQ: ", thiserr.msg)
# Basic functions
function jl_zmq_error_str()
errno = ccall((:zmq_errno, zmq), Cint, ())
c_strerror = ccall ((:zmq_strerror, zmq), Ptr{Uint8}, (Cint,), errno)
c_strerror = ccall ((:zmq_strerror, zmq), Ptr{UInt8}, (Cint,), errno)
if c_strerror != C_NULL
strerror = bytestring(c_strerror)
return strerror
Expand Down Expand Up @@ -153,7 +163,7 @@ end # end v3only

# Getting and setting socket options
# Socket options of integer type
let u64p = zeros(Uint64, 1), i64p = zeros(Int64, 1), ip = zeros(Cint, 1), u32p = zeros(Uint32, 1), sz = zeros(Uint, 1),
let u64p = zeros(UInt64, 1), i64p = zeros(Int64, 1), ip = zeros(Cint, 1), u32p = zeros(UInt32, 1), sz = zeros(UInt, 1),
pp = fill(C_NULL, 1)
opslist = [
(:set_affinity, :get_affinity, 4, u64p)
Expand Down Expand Up @@ -212,7 +222,7 @@ for (fset, fget, k, p) in opslist
@eval function ($fset)(socket::Socket, option_val::Integer)
($p)[1] = option_val
rc = ccall((:zmq_setsockopt, zmq), Cint,
(Ptr{Void}, Cint, Ptr{Void}, Uint),
(Ptr{Void}, Cint, Ptr{Void}, UInt),
socket.data, $k, $p, sizeof(eltype($p)))
if rc != 0
throw(StateError(jl_zmq_error_str()))
Expand All @@ -224,12 +234,12 @@ for (fset, fget, k, p) in opslist
@eval function ($fget)(socket::Socket)
($sz)[1] = sizeof(eltype($p))
rc = ccall((:zmq_getsockopt, zmq), Cint,
(Ptr{Void}, Cint, Ptr{Void}, Ptr{Uint}),
(Ptr{Void}, Cint, Ptr{Void}, Ptr{UInt}),
socket.data, $k, $p, $sz)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
return int(($p)[1])
return @compat Int(($p)[1])
end
end
end
Expand All @@ -239,11 +249,11 @@ if version.major == 2
global set_mcast_loop
set_mcast_loop(socket::Socket, val::Bool) = _zmq_setsockopt_mcast_loop(socket, val)
global get_mcast_loop
get_mcast_loop(socket::Socket) = bool(_zmq_getsockopt_mcast_loop(socket))
get_mcast_loop(socket::Socket) = @compat Bool(_zmq_getsockopt_mcast_loop(socket))
end
end # let
# More functions with boolean prototypes
get_rcvmore(socket::Socket) = bool(_zmq_getsockopt_rcvmore(socket))
get_rcvmore(socket::Socket) = @compat Bool(_zmq_getsockopt_rcvmore(socket))
# And a convenience function
ismore(socket::Socket) = get_rcvmore(socket)

Expand All @@ -253,7 +263,7 @@ for (f,k) in ((:subscribe,6), (:unsubscribe,7))
@eval begin
function $f_{T}(socket::Socket, filter::Ptr{T}, len::Integer)
rc = ccall((:zmq_setsockopt, zmq), Cint,
(Ptr{Void}, Cint, Ptr{T}, Uint),
(Ptr{Void}, Cint, Ptr{T}, UInt),
socket.data, $k, filter, len)
if rc != 0
throw(StateError(jl_zmq_error_str()))
Expand All @@ -272,7 +282,7 @@ wait(socket::Socket; readable=false, writable=false) = wait(fd(socket); readable


# Socket options of string type
let u8ap = zeros(Uint8, 255), sz = zeros(Uint, 1)
let u8ap = zeros(UInt8, 255), sz = zeros(UInt, 1)
opslist = [
(:set_identity, :get_identity, 5)
(:set_subscribe, nothing, 6)
Expand All @@ -292,7 +302,7 @@ for (fset, fget, k) in opslist
throw(StateError("option value too large"))
end
rc = ccall((:zmq_setsockopt, zmq), Cint,
(Ptr{Void}, Cint, Ptr{Uint8}, Uint),
(Ptr{Void}, Cint, Ptr{UInt8}, UInt),
socket.data, $k, option_val, length(option_val))
if rc != 0
throw(StateError(jl_zmq_error_str()))
Expand All @@ -304,12 +314,12 @@ for (fset, fget, k) in opslist
@eval function ($fget)(socket::Socket)
($sz)[1] = length($u8ap)
rc = ccall((:zmq_getsockopt, zmq), Cint,
(Ptr{Void}, Cint, Ptr{Uint8}, Ptr{Uint}),
(Ptr{Void}, Cint, Ptr{UInt8}, Ptr{UInt}),
socket.data, $k, $u8ap, $sz)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
return bytestring(convert(Ptr{Uint8}, $u8ap), int(($sz)[1]))
return bytestring(unsafe_convert(Ptr{UInt8}, $u8ap), @compat Int(($sz)[1]))
end
end
end
Expand All @@ -318,14 +328,14 @@ end # let


function bind(socket::Socket, endpoint::AbstractString)
rc = ccall((:zmq_bind, zmq), Cint, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
rc = ccall((:zmq_bind, zmq), Cint, (Ptr{Void}, Ptr{UInt8}), socket.data, endpoint)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
end

function connect(socket::Socket, endpoint::AbstractString)
rc=ccall((:zmq_connect, zmq), Cint, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
rc=ccall((:zmq_connect, zmq), Cint, (Ptr{Void}, Ptr{UInt8}), socket.data, endpoint)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand Down Expand Up @@ -354,7 +364,7 @@ end
const gc_free_fn_c = cfunction(gc_free_fn, Cint, (Ptr{Void}, Ptr{Void}))

## Messages ##
type Message <: AbstractArray{Uint8,1}
type Message <: AbstractArray{UInt8,1}
# 32 bytes (for v3) + a pointer (for v2)
w0::Int64
w1::Int64
Expand Down Expand Up @@ -405,9 +415,9 @@ type Message <: AbstractArray{Uint8,1}
# (note: now "owns" the buffer ... the Array must not be resized,
# or even written to after the message is sent!)
if VERSION <= v"0.4.0-dev+3703"
Message(m::ByteString) = Message(m, convert(Ptr{Uint8}, m), sizeof(m))
Message(m::ByteString) = Message(m, convert(Ptr{UInt8}, m), sizeof(m))
else
Message(m::ByteString) = Message(m, Base.unsafe_convert(Ptr{Uint8}, pointer(m)), sizeof(m))
Message(m::ByteString) = Message(m, Base.unsafe_convert(Ptr{UInt8}, pointer(m)), sizeof(m))
end
Message{T<:ByteString}(p::SubString{T}) =
Message(p, pointer(p.string.data)+p.offset, sizeof(p))
Expand All @@ -426,9 +436,9 @@ isfreed(m::Message) = haskey(gc_protect, m.handle)

# AbstractArray behaviors:
similar(a::Message, T, dims::Dims) = Array(T, dims) # ?
length(zmsg::Message) = int(ccall((:zmq_msg_size, zmq), Csize_t, (Ptr{Message},), &zmsg))
length(zmsg::Message) = @compat Int(ccall((:zmq_msg_size, zmq), Csize_t, (Ptr{Message},), &zmsg))
size(zmsg::Message) = (length(zmsg),)
convert(::Type{Ptr{Uint8}}, zmsg::Message) = ccall((:zmq_msg_data, zmq), Ptr{Uint8}, (Ptr{Message},), &zmsg)
unsafe_convert(::Type{Ptr{UInt8}}, zmsg::Message) = ccall((:zmq_msg_data, zmq), Ptr{UInt8}, (Ptr{Message},), &zmsg)
function getindex(a::Message, i::Integer)
if i < 1 || i > length(a)
throw(BoundsError())
Expand Down Expand Up @@ -491,7 +501,7 @@ const DONTWAIT = 1
const SNDMORE = 2

@v2only begin
function send(socket::Socket, zmsg::Message, flag=int32(0))
function send(socket::Socket, zmsg::Message, flag=@compat(Int32(0)))
rc = ccall((:zmq_send, zmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
socket.data, &zmsg, flag)
if rc != 0
Expand All @@ -505,7 +515,7 @@ if VERSION <= v"0.4.0-dev+3703"
immutable Ref{T} end
end

function send(socket::Socket, zmsg::Message, flag=int32(0))
function send(socket::Socket, zmsg::Message, flag=@compat(Int32(0)))
if (get_events(socket) & POLLOUT) == 0
wait(socket; writable = true)
end
Expand All @@ -523,14 +533,14 @@ end
end # end v3only

# strings are immutable, so we can send them zero-copy by default
send(socket::Socket, msg::AbstractString, flag=int32(0)) = send(socket, Message(msg), flag)
send(socket::Socket, msg::AbstractString, flag=@compat(Int32(0))) = send(socket, Message(msg), flag)

# Make a copy of arrays before sending, by default, since it is too
# dangerous to require that the array not change until ZMQ is done with it.
# For zero-copy array messages, construct a Message explicitly.
send(socket::Socket, msg::AbstractArray, flag=int32(0)) = send(socket, Message(copy(msg)), flag)
send(socket::Socket, msg::AbstractArray, flag=@compat(Int32(0))) = send(socket, Message(copy(msg)), flag)

function send(f::Function, socket::Socket, flag=int32(0))
function send(f::Function, socket::Socket, flag=@compat(Int32(0)))
io = IOBuffer()
f(io)
send(socket, Message(io), flag)
Expand All @@ -543,7 +553,7 @@ function recv(socket::Socket)
rc = ccall((:zmq_recv, zmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
socket.data, &zmsg, NOBLOCK)
if rc != 0
if errno() == Base.EAGAIN
if errno() == EAGAIN
while (get_events(socket) & POLLIN) == 0
wait(socket; readable = true)
end
Expand All @@ -564,7 +574,7 @@ function recv(socket::Socket)
rc = ccall((:zmq_msg_recv, zmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
&zmsg, socket.data, NOBLOCK)
if rc == -1
if errno() == Base.EAGAIN
if errno() == EAGAIN
while (get_events(socket) & POLLIN) == 0
wait(socket; readable = true)
end
Expand Down

0 comments on commit 02e4afb

Please sign in to comment.