Skip to content

Commit

Permalink
Merge pull request akka#22668 from gosubpl/wip/22648-fix-mergeDelta-o…
Browse files Browse the repository at this point in the history
…rset-ormap

fixes to ORSet mergeRemoveDelta and ORMap deltaMerge (akka#22648)
  • Loading branch information
patriknw authored Apr 4, 2017
2 parents d545922 + 3a8eef4 commit 6c2a304
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 150 deletions.
123 changes: 64 additions & 59 deletions akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,8 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMap[A, B] = {
// for removals the delta values map emitted will be empty
val newKeys = keys.resetDelta.remove(node, key)
// FIXME use full state for removals, until issue #22648 is fixed
// val removeDeltaOp = RemoveDeltaOp(newKeys.delta.get, zeroTag)
// new ORMap(newKeys, values - key, zeroTag, Some(newDelta(removeDeltaOp)))
new ORMap(newKeys, values - key, zeroTag, delta = None)

val removeDeltaOp = RemoveDeltaOp(newKeys.delta.get, zeroTag)
new ORMap(newKeys, values - key, zeroTag, Some(newDelta(removeDeltaOp)))
}

/**
Expand All @@ -312,10 +309,8 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
*/
@InternalApi private[akka] def removeKey(node: UniqueAddress, key: A): ORMap[A, B] = {
val newKeys = keys.resetDelta.remove(node, key)
// FIXME use full state for removals, until issue #22648 is fixed
// val removeKeyDeltaOp = RemoveKeyDeltaOp(newKeys.delta.get, key, zeroTag)
// new ORMap(newKeys, values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
new ORMap(newKeys, values, zeroTag, delta = None)
val removeKeyDeltaOp = RemoveKeyDeltaOp(newKeys.delta.get, key, zeroTag)
new ORMap(newKeys, values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
}

private def dryMerge(that: ORMap[A, B], mergedKeys: ORSet[A], valueKeysIterator: Iterator[A]): ORMap[A, B] = {
Expand Down Expand Up @@ -360,43 +355,58 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
if (delta.isEmpty) this
else new ORMap[A, B](keys.resetDelta, values, zeroTag = zeroTag)

override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
// helper function to simplify folds below
def foldValues(values: List[(A, ReplicatedData)], initial: B) =
values.foldLeft(initial) {
case (acc: DeltaReplicatedData, (_, value: ReplicatedDelta))
acc.mergeDelta(value.asInstanceOf[acc.D]).asInstanceOf[B]
case (acc, (_, value))
acc.merge(value.asInstanceOf[acc.T]).asInstanceOf[B]
private def dryMergeDelta(thatDelta: ORMap.DeltaOp, withValueDeltas: Boolean = false): ORMap[A, B] = {
def mergeValue(lvalue: ReplicatedData, rvalue: ReplicatedData): B =
(lvalue, rvalue) match {
case (v: DeltaReplicatedData, delta: ReplicatedDelta)
v.mergeDelta(delta.asInstanceOf[v.D]).asInstanceOf[B]
case _
lvalue.merge(rvalue.asInstanceOf[lvalue.T]).asInstanceOf[B]
}

val mergedKeys: ORSet[A] = thatDelta match {
case d: AtomicDeltaOp[A, B] keys.mergeDelta(d.underlying)
case ORMap.DeltaGroup(ops)
ops.foldLeft(keys)((acc, op) acc.mergeDelta(op.asInstanceOf[AtomicDeltaOp[A, B]].underlying))
}

var mergedValues = Map.empty[A, B]
var tombstonedVals = Set.empty[A]
var thatValueDeltas: Map[A, List[(A, ReplicatedData)]] = Map.empty
var mergedKeys: ORSet[A] = this.keys
var (mergedValues, tombstonedVals): (Map[A, B], Map[A, B]) = this.values.partition { case (k, _) this.keys.contains(k) }

val processDelta: PartialFunction[ORMap.DeltaOp, Unit] = {
case putOp: PutDeltaOp[A, B]
val key = putOp.value._1
thatValueDeltas += (key (putOp.value :: Nil)) // put is destructive!
case _: RemoveDeltaOp[A, B]
// remove delta is only for the side effect of key being removed
// please note that if it is not preceded by update clearing the value
// anomalies will result
val keyDelta = putOp.underlying
mergedKeys = mergedKeys.mergeDelta(keyDelta)
mergedValues = mergedValues + putOp.value // put is destructive and propagates only full values of B!
case removeOp: RemoveDeltaOp[A, B]
val removedKey = removeOp.underlying match {
// if op is RemoveDeltaOp then it must have exactly one element in the elements
case op: ORSet.RemoveDeltaOp[_] op.underlying.elements.head.asInstanceOf[A]
case _ throw new IllegalArgumentException("ORMap.RemoveDeltaOp must contain ORSet.RemoveDeltaOp inside")
}
mergedValues = mergedValues - removedKey
mergedKeys = mergedKeys.mergeDelta(removeOp.underlying)
// please note that if RemoveDeltaOp is not preceded by update clearing the value
// anomalies may result
case removeKeyOp: RemoveKeyDeltaOp[A, B]
tombstonedVals = tombstonedVals + removeKeyOp.removedKey
// removeKeyOp tombstones values for later use
if (mergedValues.contains(removeKeyOp.removedKey)) {
tombstonedVals = tombstonedVals + (removeKeyOp.removedKey mergedValues(removeKeyOp.removedKey))
}
mergedValues = mergedValues - removeKeyOp.removedKey
mergedKeys = mergedKeys.mergeDelta(removeKeyOp.underlying)
case updateOp: UpdateDeltaOp[A, _]
mergedKeys = mergedKeys.mergeDelta(updateOp.underlying)
updateOp.values.foreach {
case (key, value)
if (thatValueDeltas.contains(key))
thatValueDeltas = thatValueDeltas + (key (thatValueDeltas(key) :+ (key value)))
else
thatValueDeltas += (key ((key, value) :: Nil))
if (mergedKeys.contains(key)) {
if (mergedValues.contains(key)) {
mergedValues = mergedValues + (key mergeValue(mergedValues(key), value))
} else if (tombstonedVals.contains(key)) {
mergedValues = mergedValues + (key mergeValue(tombstonedVals(key), value))
} else {
value match {
case _: ReplicatedDelta
mergedValues = mergedValues + (key mergeValue(value.asInstanceOf[ReplicatedDelta].zero, value))
case _
mergedValues = mergedValues + (key value.asInstanceOf[B])
}
}
}
}
}

Expand All @@ -412,30 +422,25 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (

(processDelta orElse processNestedDelta)(thatDelta)

val aggregateValuesForKey: (A Unit) = { key
(this.values.get(key), thatValueDeltas.get(key)) match {
case (Some(thisValue), Some(thatValues))
val mergedValue = foldValues(thatValues, thisValue)
mergedValues = mergedValues.updated(key, mergedValue)
case (Some(thisValue), None)
mergedValues = mergedValues.updated(key, thisValue)
case (None, Some(thatValues))
val (_, initialValue) = thatValues.head
val mergedValue = initialValue match {
case _: ReplicatedDelta
foldValues(thatValues, initialValue.asInstanceOf[ReplicatedDelta].zero.asInstanceOf[B])
case _
foldValues(thatValues.tail, initialValue.asInstanceOf[B])
}
mergedValues = mergedValues.updated(key, mergedValue)
case (None, None) throw new IllegalStateException(s"missing value for $key")
}
}
if (withValueDeltas)
new ORMap[A, B](mergedKeys, tombstonedVals ++ mergedValues, zeroTag = zeroTag)
else
new ORMap[A, B](mergedKeys, mergedValues, zeroTag = zeroTag)
}

mergedKeys.elementsMap.keysIterator.foreach { aggregateValuesForKey }
tombstonedVals.foreach { aggregateValuesForKey }
override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
val thisWithDeltas = dryMergeDelta(thatDelta)
this.merge(thisWithDeltas)
}

new ORMap[A, B](mergedKeys, mergedValues, zeroTag = zeroTag)
/**
* INTERNAL API
* This function is only to be used by derived maps that avoid remove anomalies
* by keeping the vvector (in form of key -> value pair) for deleted keys
*/
@InternalApi private[akka] def mergeDeltaRetainingDeletedValues(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
val thisWithDeltas = dryMergeDelta(thatDelta, true)
this.mergeRetainingDeletedValues(thisWithDeltas)
}

private def newDelta(deltaOp: ORMap.DeltaOp) = delta match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ final class ORMultiMap[A, B] private[akka] (
override def delta: Option[D] = underlying.delta

override def mergeDelta(thatDelta: D): ORMultiMap[A, B] =
new ORMultiMap(underlying.mergeDelta(thatDelta), withValueDeltas)
if (withValueDeltas)
new ORMultiMap(underlying.mergeDeltaRetainingDeletedValues(thatDelta), withValueDeltas)
else
new ORMultiMap(underlying.mergeDelta(thatDelta), withValueDeltas)

override def modifiedByNodes: Set[UniqueAddress] =
underlying.modifiedByNodes
Expand Down
34 changes: 20 additions & 14 deletions akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,15 +345,13 @@ final class ORSet[A] private[akka] (
* INTERNAL API
*/
@InternalApi private[akka] def remove(node: UniqueAddress, element: A): ORSet[A] = {
// FIXME use full state for removals, until issue #22648 is fixed
// val deltaDot = VersionVector(node, vvector.versionAt(node))
// val rmOp = ORSet.RemoveDeltaOp(new ORSet(Map(element → deltaDot), vvector))
// val newDelta = delta match {
// case None ⇒ rmOp
// case Some(d) ⇒ d.merge(rmOp)
// }
// assignAncestor(copy(elementsMap = elementsMap - element, delta = Some(newDelta)))
assignAncestor(copy(elementsMap = elementsMap - element, delta = None))
val deltaDot = VersionVector(node, vvector.versionAt(node))
val rmOp = ORSet.RemoveDeltaOp(new ORSet(Map(element deltaDot), vvector))
val newDelta = delta match {
case None rmOp
case Some(d) d.merge(rmOp)
}
assignAncestor(copy(elementsMap = elementsMap - element, delta = Some(newDelta)))
}

/**
Expand Down Expand Up @@ -439,19 +437,27 @@ final class ORSet[A] private[akka] (
val (elem, thatDot) = that.elementsMap.head
def deleteDots = that.vvector.versionsIterator
def deleteDotsNodes = deleteDots.map { case (dotNode, _) dotNode }
val newElementsMap =
if (deleteDots.forall { case (dotNode, dotV) this.vvector.versionAt(dotNode) <= dotV }) {
elementsMap.get(elem) match {
val newElementsMap = {
val thisDotOption = this.elementsMap.get(elem)
val deleteDotsAreGreater = deleteDots.forall {
case (dotNode, dotV)
thisDotOption match {
case Some(thisDot) thisDot.versionAt(dotNode) <= dotV
case None false
}
}
if (deleteDotsAreGreater) {
thisDotOption match {
case Some(thisDot)
if (thisDot.versionsIterator.forall { case (thisDotNode, _) deleteDotsNodes.contains(thisDotNode) })
elementsMap - elem
else elementsMap
case None
elementsMap
}
} else {
} else
elementsMap
}
}
clearAncestor()
val newVvector = vvector.merge(that.vvector)
new ORSet(newElementsMap, newVvector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,11 @@ object ReplicatorDeltaSpec extends MultiNodeConfig {
case 3
// ORSet
val key = rndOrSetkey()
// FIXME use full state for removals, until issue #22648 is fixed
// // only removals for KeyF on node first
// if (key == KeyF && onNode == first && rnd.nextBoolean())
// Remove(key, rndRemoveElement(), consistency())
// else
Add(key, rndAddElement(), consistency())
// only removals for KeyF on node first
if (key == KeyF && onNode == first && rnd.nextBoolean())
Remove(key, rndRemoveElement(), consistency())
else
Add(key, rndAddElement(), consistency())
}
}.toVector
}
Expand Down
Loading

0 comments on commit 6c2a304

Please sign in to comment.