Skip to content

Commit

Permalink
Reimplemented the iterator concatenator.
Browse files Browse the repository at this point in the history
No longer accesses an iterator unless all previously added ones have
been exhausted. No recurses on a tree (could trigger a stack overflow
exception).
  • Loading branch information
pstutz committed Nov 10, 2014
1 parent b7fff17 commit a7abee5
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 57 deletions.
59 changes: 59 additions & 0 deletions src/main/scala/com/signalcollect/util/IteratorConcatenator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* @author Philip Stutz
*
* Copyright 2014 University of Zurich
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.signalcollect.util

import scala.annotation.tailrec
import scala.collection.mutable.Queue

/**
* To avoid https://issues.scala-lang.org/browse/SI-8428, which is not really fixed.
*
* Unfortunately I could not reproduce the problem outside of a large and complex TripleRush evaluation.
*/
final class IteratorConcatenator[U] extends Iterator[U] {

val iterators = new Queue[Iterator[U]]()

def clear {
iterators.clear
}

def appendIterator(i: Iterator[U]) {
iterators.enqueue(i)
}

def next: U = {
iterators.head.next
}

@tailrec def hasNext: Boolean = {
if (iterators.isEmpty) {
false
} else {
val headHasNext = iterators.head.hasNext
if (!headHasNext) {
iterators.dequeue
hasNext
} else {
true
}
}
}
}
63 changes: 6 additions & 57 deletions src/main/scala/com/signalcollect/worker/WorkerImplementation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.lang.management.ManagementFactory

import scala.annotation.elidable
import scala.annotation.elidable.ASSERTION
import scala.annotation.tailrec
import scala.util.Random

import scala.collection.mutable.Queue
import com.signalcollect.Edge
import com.signalcollect.GraphEditor
import com.signalcollect.Vertex
Expand All @@ -55,9 +55,9 @@ import com.signalcollect.interfaces.WorkerStatistics
import com.signalcollect.interfaces.WorkerStatus
import com.signalcollect.serialization.DefaultSerializer
import com.sun.management.OperatingSystemMXBean

import akka.actor.ActorRef
import akka.event.LoggingAdapter
import com.signalcollect.util.IteratorConcatenator

/**
* Main implementation of the WorkerApi interface.
Expand All @@ -80,6 +80,7 @@ class WorkerImplementation[@specialized(Int, Long) Id, Signal](
var collectThreshold: Double)
extends Worker[Id, Signal] {

val pendingModifications = new IteratorConcatenator[GraphEditor[Id, Signal] => Unit]()
val workersPerNode = numberOfWorkers / numberOfNodes // Assumes that there is the same number of workers on all nodes.
val nodeId = getNodeId(workerId)
val pingPongSchedulingIntervalInMilliseconds = 4 // schedule pingpong exchange every 8ms
Expand All @@ -98,7 +99,6 @@ class WorkerImplementation[@specialized(Int, Long) Id, Signal](
var allWorkDoneWhenContinueSent: Boolean = _
var lastStatusUpdate: Long = _
var vertexStore: Storage[Id, Signal] = _
var pendingModifications: Iterator[GraphEditor[Id, Signal] => Unit] = _
var pingSentTimestamp: Long = _
var pingPongScheduled: Boolean = _
var waitingForPong: Boolean = _
Expand All @@ -118,7 +118,7 @@ class WorkerImplementation[@specialized(Int, Long) Id, Signal](
allWorkDoneWhenContinueSent = false
lastStatusUpdate = System.currentTimeMillis
vertexStore = storageFactory.createInstance
pendingModifications = Iterator.empty
pendingModifications.clear
pingSentTimestamp = 0
pingPongScheduled = false
waitingForPong = false
Expand Down Expand Up @@ -364,7 +364,7 @@ class WorkerImplementation[@specialized(Int, Long) Id, Signal](
}

override def loadGraph(graphModifications: Iterator[GraphEditor[Id, Signal] => Unit], vertexIdHint: Option[Id]) {
pendingModifications = new IteratorConcatenator(pendingModifications, graphModifications) // To avoid https://issues.scala-lang.org/browse/SI-8428, which is not really fixed.
pendingModifications.appendIterator(graphModifications) // To avoid https://issues.scala-lang.org/browse/SI-8428, which is not really fixed.
}

override def setSignalThreshold(st: Double) {
Expand Down Expand Up @@ -724,54 +724,3 @@ trait WorkerInterceptor[Id, Signal] extends WorkerApi[Id, Signal] {
super.deleteSnapshot
}
}

class IteratorConcatenator[U](private var a: Iterator[U], private var b: Iterator[U]) extends Iterator[U] { // To avoid https://issues.scala-lang.org/browse/SI-8428, which is not really fixed.

a = simplify(a)
b = simplify(b)

def simplify(i: Iterator[U]): Iterator[U] = if (i.isInstanceOf[IteratorConcatenator[U]]) {
val iCast = i.asInstanceOf[IteratorConcatenator[U]]
if (iCast.a == null || !iCast.a.hasNext) {
if (iCast.b == null || !iCast.b.hasNext) {
null.asInstanceOf[Iterator[U]]
} else {
iCast.b
}
} else if (iCast.b == null || !iCast.b.hasNext) {
iCast.a
} else {
iCast
}
} else {
i
}

def next: U = if (a != null) {
a.next
} else {
b.next
}

def hasNext: Boolean = {
if (a != null) {
val aGotMore = a.hasNext
if (!aGotMore) {
a = null
hasNext
} else {
true
}
} else if (b != null) {
val bGotMore = b.hasNext
if (!bGotMore) {
b = null
false
} else {
true
}
} else {
false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* @author Philip Stutz
*
* Copyright 2014 University of Zurich
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.signalcollect.util

import org.scalacheck.Gen
import org.scalacheck.Gen._
import org.scalacheck.Arbitrary._
import org.scalatest.FlatSpec
import org.scalatest.ShouldMatchers
import org.scalatest.prop.Checkers
import java.io.DataOutputStream
import java.io.ByteArrayOutputStream
import org.scalacheck.Arbitrary

class IteratorConcatenatorSpec extends FlatSpec with ShouldMatchers with Checkers with TestAnnouncements {

"IteratorConcatenator" should "correctly concatenate multiple iterators" in {
val c = new IteratorConcatenator[Int]
for (i <- 1 to 1000 by 10) {
c.appendIterator((i until i + 10).iterator)
}
c.appendIterator(Seq(1001, 1002, 1003).iterator)
c.appendIterator(Set(1004).iterator)
assert(c.toList == (1 to 1004).toList)
}

it should "should work with random appends" in {
check(
(is: List[List[Int]]) => {
val c = new IteratorConcatenator[Int]
is.foreach { list => c.appendIterator(list.iterator) }
assert(c.toList == is.flatMap(_.iterator).toList)
true
},
minSuccessful(100))
}

}

0 comments on commit a7abee5

Please sign in to comment.