Skip to content

Commit

Permalink
proper implementation of mergeRemoveDelta, akka#22188
Browse files Browse the repository at this point in the history
  • Loading branch information
gosubpl authored and patriknw committed Feb 22, 2017
1 parent b700b84 commit 3d51111
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,17 +432,20 @@ final class ORSet[A] private[akka] (
private def mergeRemoveDelta(thatDelta: ORSet.RemoveDeltaOp[A]): ORSet[A] = {
val that = thatDelta.underlying
val (elem, thatDot) = that.elementsMap.head
def deleteDots = that.vvector.versionsIterator
def deleteDotsNodes = deleteDots.map { case (dotNode, _) dotNode }
val newElementsMap =
if (that.vvector > vvector || that.vvector == vvector)
elementsMap - elem
else {
if (deleteDots.forall { case (dotNode, dotV) this.vvector.versionAt(dotNode) <= dotV }) {
elementsMap.get(elem) match {
case Some(thisDot)
if (thatDot == thisDot || thatDot > thisDot) elementsMap - elem
if (thisDot.versionsIterator.forall { case (thisDotNode, _) deleteDotsNodes.contains(thisDotNode) })
elementsMap - elem
else elementsMap
case None
elementsMap
}
} else {
elementsMap
}
clearAncestor()
val newVvector = vvector.merge(that.vvector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ORSetSpec extends WordSpec with Matchers {

val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L)
val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L)
val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L)

val nodeA = UniqueAddress(Address("akka.tcp", "Sys", "a", 2552), 1L)
val nodeB = UniqueAddress(nodeA.address.copy(host = Some("b")), 2L)
Expand Down Expand Up @@ -374,6 +375,50 @@ class ORSetSpec extends WordSpec with Matchers {
s9.elements should ===(Set("a", "z"))
}

"handle a mixed add/remove scenario 2" in {
val s1 = ORSet.empty[String]
val s2 = s1.resetDelta.add(node1, "a")
val s3 = s2.resetDelta.add(node1, "b")
val s4 = s3.resetDelta.add(node2, "a")
val s5 = s4.resetDelta.remove(node1, "a")

s5.elements should ===(Set("b"))

val delta1 = s2.delta.get merge s3.delta.get
val delta2 = s4.delta.get

val t1 = ORSet.empty[String]
val t2 = t1.mergeDelta(delta1).mergeDelta(delta2)
t2.elements should ===(Set("a", "b"))
val t3 = t2.resetDelta.add(node3, "z")

val t4 = t3.mergeDelta(s5.delta.get)

t4.elements should ===(Set("b", "z"))
}

"handle a mixed add/remove scenario 3" in {
val s1 = ORSet.empty[String]
val s2 = s1.resetDelta.add(node1, "a")
val s3 = s2.resetDelta.add(node1, "b")
val s4 = s3.resetDelta.add(node2, "a")
val s5 = s4.resetDelta.remove(node1, "a")

s5.elements should ===(Set("b"))

val delta1 = s2.delta.get merge s3.delta.get

val t1 = ORSet.empty[String]

val t2 = t1.mergeDelta(delta1)
t2.elements should ===(Set("a", "b"))
val t3 = t2.resetDelta.add(node3, "a")

val t4 = t3.mergeDelta(s5.delta.get)

t4.elements should ===(Set("b", "a"))
}

"require causal delivery of deltas" in {
// This test illustrates why we need causal delivery of deltas.
// Otherwise the following could happen.
Expand Down

0 comments on commit 3d51111

Please sign in to comment.