Skip to content

Commit

Permalink
Merge pull request JuliaLang#35521 from JuliaLang/sf/udp_multicast
Browse files Browse the repository at this point in the history
  • Loading branch information
staticfloat authored May 4, 2020
2 parents 44f3bb3 + e1d8da1 commit 6418be9
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 11 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ Standard library changes


#### Sockets
* Joining and leaving UDP multicast groups on a `UDPSocket` is now supported
through `join_multicast_group()` and `leave_multicast_group()` ([#35521]).

#### Distributed
* `launch_on_machine` now supports and parses ipv6 square-bracket notation ([#34430])
Expand Down
52 changes: 52 additions & 0 deletions stdlib/Sockets/src/Sockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export
recv,
recvfrom,
send,
join_multicast_group,
leave_multicast_group,
TCPSocket,
UDPSocket,
@ip_str,
Expand Down Expand Up @@ -727,6 +729,56 @@ end

listenany(default_port) = listenany(localhost, default_port)

function udp_set_membership(sock::UDPSocket, group_addr::String,
interface_addr::Union{Nothing, String}, operation)
if interface_addr === nothing
interface_addr = C_NULL
end
r = ccall(:uv_udp_set_membership, Cint,
(Ptr{Cvoid}, Cstring, Cstring, Cint),
sock.handle, group_addr, interface_addr, operation)
uv_error("uv_udp_set_membership", r)
return
end

"""
join_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
Join a socket to a particular multicast group defined by `group_addr`.
If `interface_addr` is given, specifies a particular interface for multi-homed
systems. Use `leave_multicast_group()` to disable reception of a group.
"""
function join_multicast_group(sock::UDPSocket, group_addr::String,
interface_addr::Union{Nothing, String} = nothing)
return udp_set_membership(sock, group_addr, interface_addr, 1)
end
function join_multicast_group(sock::UDPSocket, group_addr::IPAddr,
interface_addr::Union{Nothing, IPAddr} = nothing)
if interface_addr !== nothing
interface_addr = string(interface_addr)
end
return join_multicast_group(sock, string(group_addr), interface_addr)
end

"""
leave_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
Remove a socket from a particular multicast group defined by `group_addr`.
If `interface_addr` is given, specifies a particular interface for multi-homed
systems. Use `join_multicast_group()` to enable reception of a group.
"""
function leave_multicast_group(sock::UDPSocket, group_addr::String,
interface_addr::Union{Nothing, String} = nothing)
return udp_set_membership(sock, group_addr, interface_addr, 0)
end
function leave_multicast_group(sock::UDPSocket, group_addr::IPAddr,
interface_addr::Union{Nothing, IPAddr} = nothing)
if interface_addr !== nothing
interface_addr = string(interface_addr)
end
return leave_multicast_group(sock, string(group_addr), interface_addr)
end

"""
getsockname(sock::Union{TCPServer, TCPSocket}) -> (IPAddr, UInt16)
Expand Down
54 changes: 43 additions & 11 deletions stdlib/Sockets/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -391,17 +391,17 @@ end

@testset "Local-machine broadcast" begin
let a, b, c
# (Mac OS X's loopback interface doesn't support broadcasts)
# Apple does not support broadcasting on 127.255.255.255
bcastdst = Sys.isapple() ? ip"255.255.255.255" : ip"127.255.255.255"

function create_socket()
function create_socket(addr::IPAddr, port)
s = UDPSocket()
bind(s, ip"0.0.0.0", 2000, reuseaddr = true, enable_broadcast = true)
s
bind(s, addr, port, reuseaddr = true, enable_broadcast = true)
return s
end

function wait_with_timeout(recvs)
TIMEOUT_VAL = 3*1e9 # nanoseconds
# Wait for futures to finish with a given timeout
function wait_with_timeout(recvs, TIMEOUT_VAL = 3*1e9)
t0 = time_ns()
recvs_check = copy(recvs)
while ((length(filter!(t->!istaskdone(t), recvs_check)) > 0)
Expand All @@ -412,23 +412,55 @@ end
map(wait, recvs)
end

a, b, c = [create_socket() for i = 1:3]
# First, test IPv4 broadcast
port = 2000
a, b, c = [create_socket(ip"0.0.0.0", port) for i in 1:3]
try
# bsd family do not allow broadcasting to ip"255.255.255.255"
# or ip"127.255.255.255"
# bsd family do not allow broadcasting on loopbacks
@static if !Sys.isbsd() || Sys.isapple()
send(c, bcastdst, 2000, "hello")
send(c, bcastdst, port, "hello")
recvs = [@async @test String(recv(s)) == "hello" for s in (a, b)]
wait_with_timeout(recvs)
end
catch e
if isa(e, Base.IOError) && Base.uverrorname(e.code) == "EPERM"
@warn "UDP broadcast test skipped (permission denied upon send, restrictive firewall?)"
@warn "UDP IPv4 broadcast test skipped (permission denied upon send, restrictive firewall?)"
else
rethrow()
end
end
[close(s) for s in [a, b, c]]

# Test ipv6 broadcast groups
a, b, c = [create_socket(ip"::", port) for i in 1:3]
try
# Exemplary Interface-local ipv6 multicast group, if we wanted this to actually be routed
# to other computers, we should use a link-local or larger address scope group
# bsd family and darwin do not allow broadcasting on loopbacks
@static if !Sys.isbsd() && !Sys.isapple()
group = ip"ff11::6a75:6c69:61"
join_multicast_group(a, group)
join_multicast_group(b, group)

send(c, group, port, "hello")
recvs = [@async @test String(recv(s)) == "hello" for s in (a, b)]
wait_with_timeout(recvs)

leave_multicast_group(a, group)
leave_multicast_group(b, group)

send(c, group, port, "hello")
recvs = [@async @test String(recv(s)) == "hello" for s in (a, b)]
# We only wait 200ms since we're pretty sure this is going to time out
@test_throws ErrorException wait_with_timeout(recvs, 2e8)
end
catch e
if isa(e, Base.IOError) && Base.uverrorname(e.code) == "EPERM"
@warn "UDP IPv6 broadcast test skipped (permission denied upon send, restrictive firewall?)"
else
rethrow()
end
end
end
end

Expand Down

0 comments on commit 6418be9

Please sign in to comment.