Skip to content

Commit

Permalink
akka#22463 Check for wired ports before wiring in traversal builder (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
2m authored and drewhk committed Mar 8, 2017
1 parent 2ae8c53 commit 45ccd4f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ class GraphMergePreferredSpec extends TwoStreamsSetup {
}).getMessage should include("[MergePreferred.preferred] is already connected")
}

"disallow multiple outputs" in {
val s = Source(0 to 3)

(the[IllegalArgumentException] thrownBy {
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b
val merge = b.add(MergePreferred[Int](1))

s ~> merge.preferred
s ~> merge.in(0)

merge.out ~> Sink.head[Int]
merge.out ~> Sink.head[Int]
ClosedShape
})
}).getMessage should include("[MergePreferred.out] is already connected")
}

}

}
13 changes: 11 additions & 2 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Promise
import scala.util.control.NoStackTrace
import scala.util.control.{ NoStackTrace, NonFatal }

object Merge {
/**
Expand Down Expand Up @@ -996,7 +996,16 @@ object GraphDSL extends GraphApply {
* INTERNAL API
*/
private[GraphDSL] def addEdge[T, U >: T](from: Outlet[T], to: Inlet[U]): Unit =
traversalBuilderInProgress = traversalBuilderInProgress.wire(from, to)
try {
traversalBuilderInProgress = traversalBuilderInProgress.wire(from, to)
} catch {
case NonFatal(ex)
if (!traversalBuilderInProgress.isUnwired(from))
throw new IllegalArgumentException(s"[${from.s}] is already connected")
else if (!traversalBuilderInProgress.isUnwired(to))
throw new IllegalArgumentException(s"[${to.s}] is already connected")
else throw ex
}

/**
* Import a graph into this module, performing a deep copy, discarding its
Expand Down

0 comments on commit 45ccd4f

Please sign in to comment.