Skip to content

Commit

Permalink
simplify Selection API in Cleanable
Browse files Browse the repository at this point in the history
minor refactoring
  • Loading branch information
tribbloid committed Oct 24, 2023
1 parent 698b295 commit ca2a76d
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ object PyRef {
object ROOT extends PyRef {}

def sanityCheck(): Unit = {
val subs = Cleanable.All.typed[PyBinding]
val refSubs = Cleanable.All.typed[PyRef].map(_.bindings)
val subs = Cleanable.All.typed[PyBinding].selected
val refSubs = Cleanable.All.typed[PyRef].selected.map(_.bindings)
assert(
subs.intersect(refSubs).size <= refSubs.size,
"INTERNAL ERROR: dangling tree!"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object ConflictDetection {

def conflicts: Seq[Try[Unit]] = {

val allObj = Cleanable.All.typed[ConflictDetection]
val allObj = Cleanable.All.typed[ConflictDetection].selected

val allResourceIDs: Map[String, Seq[Any]] = allObj
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.tribbloids.spookystuff.utils.lifespan.{Cleanable, LeafType, LocalClea
import com.tribbloids.spookystuff.utils.serialization.AssertSerializable
import org.apache.spark.{HashPartitioner, SparkException, TaskContext}

import java.util.concurrent.atomic.AtomicInteger
import scala.util.Random

// TODO: move to previous module
Expand All @@ -21,10 +22,12 @@ class CleanableSuite extends SpookyBaseSpec {
override def beforeEach(): Unit = {
super.beforeEach()
sc.runEverywhere(alsoOnDriver = false) { _ =>
Cleanable.All.cleanSweep {
case _: DummyCleanable => true
case _ => false
}
Cleanable.All
.filter {
case _: DummyCleanable => true
case _ => false
}
.cleanSweep()
}
}

Expand Down Expand Up @@ -141,39 +144,80 @@ class CleanableSuite extends SpookyBaseSpec {

private def verify(getDummy: Int => Unit) = {
val ss = 1 to 10
for (_ <- 1 to 10) {
sc.parallelize(ss).foreach {
getDummy

val inc1 = sc
.uuidSeed()
.mapOncePerWorker { _ =>
counter.get()
}
.reduce(_ + _)

for (j <- 1 to 10) {
sc.parallelize(ss).foreach { i =>
getDummy(i)
}
}

val i2 = sc
val inc2 = sc
.uuidSeed()
.mapOncePerWorker { _ =>
Cleanable.All.typed[DummyCleanable].cleanSweep()
counter.get()
}
.reduce(_ + _)

sc
.uuidSeed()
.mapOncePerWorker { _ =>
Cleanable.All.typed[DummyCleanable].map(_.index)
Cleanable.All.typed[DummyCleanable].cleanSweep()
counter.get()
}
.flatMap(identity)
.collect()
.toSeq
.reduce(_ + _)

assert(i2.size == ss.size * 10)
assert(i2.distinct.sorted == ss)
assert(inc2 - inc1 == ss.size * 10)
// assert(i2.distinct.sorted == ss)
}

it("can get all created Cleanables even their hashcodes may overlap") {

verify(i => DummyCleanable(i, None))
}

}

object CleanableSuite {

val counter: AtomicInteger = new AtomicInteger(0)

val doInc: () => Unit = () => counter.incrementAndGet()

def incOf(fn: => Unit): Int = {
val c1 = counter.get()

fn

System.gc()

Thread.sleep(1000)
val c2 = counter.get()

c2 - c1
}

def assertInc(fn: => Unit, expected: Int = 1): Unit = {
val inc = incOf(fn)

assert(inc == expected)
}

case class DummyCleanable(
index: Int,
id: Option[Long] = Some(Random.nextLong())
) extends LocalCleanable {

override protected def cleanImpl(): Unit = {}
override protected def cleanImpl(): Unit = {
counter.incrementAndGet()
}
}

def lifespanIsSerializable(v: Lifespan): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import com.tribbloids.spookystuff.session.DriverLike
import com.tribbloids.spookystuff.utils.lifespan.Cleanable
import com.tribbloids.spookystuff.utils.lifespan.Cleanable.Lifespan
import com.tribbloids.spookystuff.utils.{CommonConst, CommonUtils, Retry, TreeThrowable}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.jutils.jprocesses.JProcesses
import org.jutils.jprocesses.model.ProcessInfo
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
import org.scalatest.{BeforeAndAfterEach, Outcome, Retries}

import scala.language.implicitConversions
import scala.util.Try
Expand All @@ -38,7 +36,7 @@ object SpookyBaseSpec {

Cleanable.uncleaned
.foreach { tuple =>
val taskCleanable = tuple._2.values
val taskCleanable = tuple._2.active.values
.filter { v =>
val isOfTask = v.lifespan.leaves.exists { ll =>
ll._type == Lifespan.Task
Expand Down Expand Up @@ -67,12 +65,13 @@ object SpookyBaseSpec {

if (cleanSweepDrivers) {
// this is necessary as each suite won't automatically cleanup drivers NOT in task when finished
Cleanable.All.cleanSweep(
condition = {
Cleanable.All
.filter {

case _: DriverLike => true
case _ => false
}
)
.cleanSweep()
}

conditions.foreach { condition =>
Expand All @@ -86,45 +85,12 @@ object SpookyBaseSpec {
}
}

trait EnvBase {

def sc: SparkContext = TestHelper.TestSC
def sql: SQLContext = TestHelper.TestSQL

var _spooky: SpookyContext = _
def spooky: SpookyContext = {
Option(_spooky)
.getOrElse {
val result: SpookyContext = reloadSpooky
result
}
}

def reloadSpooky: SpookyContext = {
val sql = this.sql
val result = SpookyContext(sql, SpookyConf.default)
_spooky = result
result
}

def spookyConf: SpookyConf = spooky.getConf(Core)
}

}

abstract class SpookyBaseSpec
extends FunSpecx
with SpookyBaseSpec.EnvBase
with RemoteDocsFixture
with BeforeAndAfterEach
with BeforeAndAfterAll
with Retries
with SparkUISupport {
abstract class SpookyBaseSpec extends SpookyEnvSpec with RemoteDocsFixture with BeforeAndAfterEach with Retries {

val exitingPIDs: Set[String] = SpookyBaseSpec.getProcesses.map(_.getPid).toSet

def parallelism: Int = sc.defaultParallelism

lazy val defaultEC: SpookyExecutionContext = SpookyExecutionContext(spooky)
lazy val defaultSchema: SpookySchema = SpookySchema(defaultEC)

Expand All @@ -151,9 +117,9 @@ abstract class SpookyBaseSpec

import com.tribbloids.spookystuff.utils.SpookyViews._

def _processNames: Seq[String] = Seq("phantomjs", s"${PythonDriverFactory.python3} -iu")
def _externalProcessNames: Seq[String] = Seq("phantomjs", s"${PythonDriverFactory.python3} -iu")
final lazy val conditions: Seq[ProcessInfo => Boolean] = {
val _processNames = this._processNames
val _processNames = this._externalProcessNames
val exitingPIDs = this.exitingPIDs
_processNames.map {
name =>
Expand Down
Loading

0 comments on commit ca2a76d

Please sign in to comment.