diff --git a/src/main/scala/com/signalcollect/util/IteratorConcatenator.scala b/src/main/scala/com/signalcollect/util/IteratorConcatenator.scala new file mode 100644 index 00000000..d648b6ca --- /dev/null +++ b/src/main/scala/com/signalcollect/util/IteratorConcatenator.scala @@ -0,0 +1,59 @@ +/* + * @author Philip Stutz + * + * Copyright 2014 University of Zurich + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.signalcollect.util + +import scala.annotation.tailrec +import scala.collection.mutable.Queue + +/** + * To avoid https://issues.scala-lang.org/browse/SI-8428, which is not really fixed. + * + * Unfortunately I could not reproduce the problem outside of a large and complex TripleRush evaluation. + */ +final class IteratorConcatenator[U] extends Iterator[U] { + + val iterators = new Queue[Iterator[U]]() + + def clear { + iterators.clear + } + + def appendIterator(i: Iterator[U]) { + iterators.enqueue(i) + } + + def next: U = { + iterators.head.next + } + + @tailrec def hasNext: Boolean = { + if (iterators.isEmpty) { + false + } else { + val headHasNext = iterators.head.hasNext + if (!headHasNext) { + iterators.dequeue + hasNext + } else { + true + } + } + } +} diff --git a/src/main/scala/com/signalcollect/worker/WorkerImplementation.scala b/src/main/scala/com/signalcollect/worker/WorkerImplementation.scala index 9adaacdc..b5cecc09 100755 --- a/src/main/scala/com/signalcollect/worker/WorkerImplementation.scala +++ b/src/main/scala/com/signalcollect/worker/WorkerImplementation.scala @@ -26,11 +26,11 @@ import java.io.File import java.io.FileInputStream import java.io.FileOutputStream import java.lang.management.ManagementFactory - import scala.annotation.elidable import scala.annotation.elidable.ASSERTION +import scala.annotation.tailrec import scala.util.Random - +import scala.collection.mutable.Queue import com.signalcollect.Edge import com.signalcollect.GraphEditor import com.signalcollect.Vertex @@ -55,9 +55,9 @@ import com.signalcollect.interfaces.WorkerStatistics import com.signalcollect.interfaces.WorkerStatus import com.signalcollect.serialization.DefaultSerializer import com.sun.management.OperatingSystemMXBean - import akka.actor.ActorRef import akka.event.LoggingAdapter +import com.signalcollect.util.IteratorConcatenator /** * Main implementation of the WorkerApi interface. @@ -80,6 +80,7 @@ class WorkerImplementation[@specialized(Int, Long) Id, Signal]( var collectThreshold: Double) extends Worker[Id, Signal] { + val pendingModifications = new IteratorConcatenator[GraphEditor[Id, Signal] => Unit]() val workersPerNode = numberOfWorkers / numberOfNodes // Assumes that there is the same number of workers on all nodes. val nodeId = getNodeId(workerId) val pingPongSchedulingIntervalInMilliseconds = 4 // schedule pingpong exchange every 8ms @@ -98,7 +99,6 @@ class WorkerImplementation[@specialized(Int, Long) Id, Signal]( var allWorkDoneWhenContinueSent: Boolean = _ var lastStatusUpdate: Long = _ var vertexStore: Storage[Id, Signal] = _ - var pendingModifications: Iterator[GraphEditor[Id, Signal] => Unit] = _ var pingSentTimestamp: Long = _ var pingPongScheduled: Boolean = _ var waitingForPong: Boolean = _ @@ -118,7 +118,7 @@ class WorkerImplementation[@specialized(Int, Long) Id, Signal]( allWorkDoneWhenContinueSent = false lastStatusUpdate = System.currentTimeMillis vertexStore = storageFactory.createInstance - pendingModifications = Iterator.empty + pendingModifications.clear pingSentTimestamp = 0 pingPongScheduled = false waitingForPong = false @@ -364,7 +364,7 @@ class WorkerImplementation[@specialized(Int, Long) Id, Signal]( } override def loadGraph(graphModifications: Iterator[GraphEditor[Id, Signal] => Unit], vertexIdHint: Option[Id]) { - pendingModifications = new IteratorConcatenator(pendingModifications, graphModifications) // To avoid https://issues.scala-lang.org/browse/SI-8428, which is not really fixed. + pendingModifications.appendIterator(graphModifications) // To avoid https://issues.scala-lang.org/browse/SI-8428, which is not really fixed. } override def setSignalThreshold(st: Double) { @@ -724,54 +724,3 @@ trait WorkerInterceptor[Id, Signal] extends WorkerApi[Id, Signal] { super.deleteSnapshot } } - -class IteratorConcatenator[U](private var a: Iterator[U], private var b: Iterator[U]) extends Iterator[U] { // To avoid https://issues.scala-lang.org/browse/SI-8428, which is not really fixed. - - a = simplify(a) - b = simplify(b) - - def simplify(i: Iterator[U]): Iterator[U] = if (i.isInstanceOf[IteratorConcatenator[U]]) { - val iCast = i.asInstanceOf[IteratorConcatenator[U]] - if (iCast.a == null || !iCast.a.hasNext) { - if (iCast.b == null || !iCast.b.hasNext) { - null.asInstanceOf[Iterator[U]] - } else { - iCast.b - } - } else if (iCast.b == null || !iCast.b.hasNext) { - iCast.a - } else { - iCast - } - } else { - i - } - - def next: U = if (a != null) { - a.next - } else { - b.next - } - - def hasNext: Boolean = { - if (a != null) { - val aGotMore = a.hasNext - if (!aGotMore) { - a = null - hasNext - } else { - true - } - } else if (b != null) { - val bGotMore = b.hasNext - if (!bGotMore) { - b = null - false - } else { - true - } - } else { - false - } - } -} diff --git a/src/test/scala/com/signalcollect/util/IteratorConcatenatorSpec.scala b/src/test/scala/com/signalcollect/util/IteratorConcatenatorSpec.scala new file mode 100755 index 00000000..7d89e587 --- /dev/null +++ b/src/test/scala/com/signalcollect/util/IteratorConcatenatorSpec.scala @@ -0,0 +1,54 @@ +/* + * @author Philip Stutz + * + * Copyright 2014 University of Zurich + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.signalcollect.util + +import org.scalacheck.Gen +import org.scalacheck.Gen._ +import org.scalacheck.Arbitrary._ +import org.scalatest.FlatSpec +import org.scalatest.ShouldMatchers +import org.scalatest.prop.Checkers +import java.io.DataOutputStream +import java.io.ByteArrayOutputStream +import org.scalacheck.Arbitrary + +class IteratorConcatenatorSpec extends FlatSpec with ShouldMatchers with Checkers with TestAnnouncements { + + "IteratorConcatenator" should "correctly concatenate multiple iterators" in { + val c = new IteratorConcatenator[Int] + for (i <- 1 to 1000 by 10) { + c.appendIterator((i until i + 10).iterator) + } + c.appendIterator(Seq(1001, 1002, 1003).iterator) + c.appendIterator(Set(1004).iterator) + assert(c.toList == (1 to 1004).toList) + } + + it should "should work with random appends" in { + check( + (is: List[List[Int]]) => { + val c = new IteratorConcatenator[Int] + is.foreach { list => c.appendIterator(list.iterator) } + assert(c.toList == is.flatMap(_.iterator).toList) + true + }, + minSuccessful(100)) + } + +}