Skip to content

Commit

Permalink
Merge pull request akka#1127 from akka/wip-3035-io-exceptions-rich
Browse files Browse the repository at this point in the history
IOManager connect and accept failure handling. Fixes #3035
  • Loading branch information
richdougherty committed Feb 13, 2013
2 parents 24bff5e + acadb9e commit a3b0e93
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 20 deletions.
22 changes: 22 additions & 0 deletions akka-actor-tests/src/test/scala/akka/actor/IOActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,28 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
if (!s.isClosed) s.close()
}
}

"fail when listening on an invalid address" in {
implicit val self = testActor
val address = new InetSocketAddress("irate.elephant", 9999)
IOManager(system).listen(address)
expectMsgType[Status.Failure](1 seconds)
}

"fail when listening on a privileged port" in {
implicit val self = testActor
val address = new InetSocketAddress("localhost", 80) // Assumes test not run as root
IOManager(system).listen(address)
expectMsgType[Status.Failure](1 seconds)
}

"fail when connecting to an invalid address" in {
implicit val self = testActor
val address = new InetSocketAddress("irate.elephant", 80)
IOManager(system).connect(address)
expectMsgType[Status.Failure](1 seconds)
}

}

}
52 changes: 32 additions & 20 deletions akka-actor/src/main/scala/akka/actor/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1022,31 +1022,43 @@ final class IOManagerActor(val settings: Settings) extends Actor with ActorLoggi

case IO.Listen(server, address, options)
val channel = ServerSocketChannel open ()
channel configureBlocking false

var backlog = defaultBacklog
val sock = channel.socket
options foreach {
case IO.ReceiveBufferSize(size) forwardFailure(sock.setReceiveBufferSize(size))
case IO.ReuseAddress(on) forwardFailure(sock.setReuseAddress(on))
case IO.PerformancePreferences(connTime, latency, bandwidth)
forwardFailure(sock.setPerformancePreferences(connTime, latency, bandwidth))
case IO.Backlog(number) backlog = number
try {
channel configureBlocking false
var backlog = defaultBacklog
val sock = channel.socket
options foreach {
case IO.ReceiveBufferSize(size) forwardFailure(sock.setReceiveBufferSize(size))
case IO.ReuseAddress(on) forwardFailure(sock.setReuseAddress(on))
case IO.PerformancePreferences(connTime, latency, bandwidth)
forwardFailure(sock.setPerformancePreferences(connTime, latency, bandwidth))
case IO.Backlog(number) backlog = number
}
sock bind (address, backlog)
channels update (server, channel)
channel register (selector, OP_ACCEPT, server)
server.owner ! IO.Listening(server, sock.getLocalSocketAddress())
} catch {
case NonFatal(e) {
channel close ()
sender ! Status.Failure(e)
}
}

channel.socket bind (address, backlog)
channels update (server, channel)
channel register (selector, OP_ACCEPT, server)
server.owner ! IO.Listening(server, sock.getLocalSocketAddress())
run()

case IO.Connect(socket, address, options)
val channel = SocketChannel open ()
channel configureBlocking false
channel connect address
setSocketOptions(channel.socket, options)
channels update (socket, channel)
channel register (selector, OP_CONNECT | OP_READ, socket)
try {
channel configureBlocking false
channel connect address
setSocketOptions(channel.socket, options)
channels update (socket, channel)
channel register (selector, OP_CONNECT | OP_READ, socket)
} catch {
case NonFatal(e) {
channel close ()
sender ! Status.Failure(e)
}
}
run()

case IO.Accept(socket, server, options)
Expand Down

0 comments on commit a3b0e93

Please sign in to comment.