Skip to content
This repository has been archived by the owner on May 7, 2019. It is now read-only.

Commit

Permalink
Merge pull request #11 from Max-Meldrum/dev
Browse files Browse the repository at this point in the history
Add cgroups prototype, connect kompact-ext and add logging for executors
  • Loading branch information
Max-Meldrum authored Sep 11, 2018
2 parents d215688 + 82c8bb8 commit a37820d
Show file tree
Hide file tree
Showing 55 changed files with 1,234 additions and 333 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ native/
build/
.idea/
weldrunner
executor
hadoop*
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ lazy val root = (project in file("."))


lazy val statemanager = (project in file("runtime/statemanager"))
.dependsOn(runtimeProtobuf, runtimeCommon % "test->test; compile->compile")
.dependsOn(runtimeProtobuf, runtimeCommon, kompactExtension % "test->test; compile->compile")
.settings(runtimeSettings: _*)
.settings(Dependencies.statemanager)
.settings(moduleName("runtime.statemanager"))
Expand Down Expand Up @@ -93,7 +93,7 @@ lazy val clusterManagerCommon = (project in file("cluster-manager-common"))
.settings(moduleName("clustermanager.common"))

lazy val standalone = (project in file("cluster-manager/standalone"))
.dependsOn(runtimeProtobuf, runtimeCommon, clusterManagerCommon % "test->test; compile->compile")
.dependsOn(runtimeProtobuf, runtimeCommon, clusterManagerCommon, kompactExtension % "test->test; compile->compile")
.settings(runtimeSettings: _*)
.settings(Dependencies.standalone)
.settings(moduleName("clustermanager.standalone"))
Expand Down
18 changes: 18 additions & 0 deletions cgroups.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

# TaskManager
sudo mkdir -p /sys/fs/cgroup/cpu/taskmanager
sudo mkdir -p /sys/fs/cgroup/memory/taskmanager

# Containers
sudo mkdir -p /sys/fs/cgroup/cpu/containers
sudo mkdir -p /sys/fs/cgroup/memory/containers


# Set User rights to the directories

sudo chown -R ${USER}: /sys/fs/cgroup/cpu/taskmanager
sudo chown -R ${USER}: /sys/fs/cgroup/cpu/containers

sudo chown -R ${USER}: /sys/fs/cgroup/memory/taskmanager
sudo chown -R ${USER}: /sys/fs/cgroup/memory/containers
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package clustermanager.common.executor

import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions}
import java.nio.file.{Files, Paths}
import java.nio.file.{FileAlreadyExistsException, Files, Path, Paths}
import java.util.Comparator
import java.util.stream.Collectors

Expand Down Expand Up @@ -127,10 +127,33 @@ private[clustermanager] class ExecutionEnvironment(jobId: String) extends LazyLo
))
}

def getJobPath: String =
OperatingSystem.get() match {
case Linux => LINUX_JOB_PATH
case Mac => MAC_OS_JOB_PATH
case _ => ""
def getJobPath: String = OperatingSystem.get() match {
case Linux => LINUX_JOB_PATH
case Mac => MAC_OS_JOB_PATH
case _ => ""
}

def createLogForTask(taskName: String): Boolean = {
try {
val path = Paths.get(getJobPath + "/" + taskName.concat(".log"))
Files.createFile(path)
true
} catch {
case err: FileAlreadyExistsException =>
true
case err: Exception =>
logger.error(err.toString)
false
}
}

def getLogFile(taskName: String): Option[Path] = {
val name = taskName.concat(".log")
val log = Paths.get(getJobPath + "/" + name)
if (Files.exists(log))
Some(log)
else
None
}

}
12 changes: 9 additions & 3 deletions cluster-manager/standalone/src/main/resources/taskmanager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ akka {
port = 0
}


// Cluster Metrics
extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]
extensions = [ "akka.cluster.metrics.ClusterMetricsExtension", "runtime.kompact.KompactExtension"]

// Settings for the Kompact Proxy
kompact.port = 3000
kompact.host = "localhost"

actor {
provider = "cluster"
Expand Down Expand Up @@ -61,8 +63,12 @@ akka {

taskmanager {
sliceUpdateTick = 500
slices = 4
taskMasterTimeout = 1000 // Within the time, the TM should get a keep alive msg from a AppMaster
taskExecutorHealthCheck = 1000
hostname = "localhost"

isolation = "cgroups"
cgroups-path = "/sys/fs/cgroup"
// Amount that will be allocated for the containers. The rest is for other services such as the TaskManager
resource-limit = 70
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class ClusterListener extends Actor with ActorLogging {

import ClusterListener._

val cluster = Cluster(context.system)
val resourceManager = context.actorOf(ResourceManager(), Identifiers.RESOURCE_MANAGER)
private val cluster = Cluster(context.system)
private val resourceManager = context.actorOf(ResourceManager(), Identifiers.RESOURCE_MANAGER)

override def preStart(): Unit =
cluster.subscribe(self, classOf[MemberUp], classOf[UnreachableMember], classOf[MemberRemoved])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.util.UUID

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Address, Cancellable, Props}
import clustermanager.standalone.resourcemanager.actors.ResourceManager.ResourceRequest
import runtime.common.ActorPaths
import runtime.common.{ActorPaths, IdGenerator}
import runtime.protobuf.messages.SliceState.ALLOCATED
import runtime.protobuf.messages._

Expand Down Expand Up @@ -33,10 +33,10 @@ private[resourcemanager] abstract class SliceManager extends Actor with ActorLog
// in order to make sure that the TaskManager is initialized
target ! TaskManagerInit()
case TaskManagerRemoved(tm) =>
log.info("TaskManager Removed")
log.info(s"TaskManager Removed: $tm" )
cleanTaskManager(tm)
case UnreachableTaskManager(tm) =>
log.info("TaskManager Unreachable")
log.info(s"TaskManager Unreachable: $tm")
cleanTaskManager(tm)
case SliceUpdate(_slices) =>
slices.put(sender().path.address, _slices)
Expand All @@ -46,6 +46,14 @@ private[resourcemanager] abstract class SliceManager extends Actor with ActorLog
private def cleanTaskManager(tm: Address): Unit = {
Try {
taskManagers = taskManagers.filterNot(_ == tm)
val slicesOpt = slices.get(tm)
slicesOpt match {
case Some(s) =>
// If they exist in offeredSlices, then remove them
offeredSlices.remove(s)
case None => // Ignore
}
// Finish cleanup
slices.remove(tm)
} match {
case Success(_) => // ignore
Expand Down Expand Up @@ -122,7 +130,7 @@ private[resourcemanager] class RoundRobinScheduler extends Scheduler {
}
case SlicesAllocated(_slices) =>
if (offeredSlices.remove(_slices))
log.info("Offered slices have now been allocated, removing")
log.debug("Offered slices have now been allocated, removing")
else
log.error("Could not remove the offered slices")

Expand Down Expand Up @@ -170,7 +178,8 @@ private[resourcemanager] class RoundRobinScheduler extends Scheduler {
fetchSlots(freeSlices, job) match {
case Some(chosen) =>
import runtime.protobuf.ProtoConversions.Address._
val c = Container(job.id, job.appMasterRef.get, taskManagers(roundNumber), chosen, job.tasks)
val c = Container(IdGenerator.container(), job.id, job.appMasterRef.get,
taskManagers(roundNumber), chosen, job.tasks)
Some(Seq(c))
case None =>
roundNumber += 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package clustermanager.standalone.taskmanager.actors

import akka.actor.{Actor, ActorLogging, Cancellable, Props}
import clustermanager.standalone.taskmanager.isolation.ContainerMetrics
import runtime.protobuf.messages.Container

import scala.concurrent.duration._

object MetricsReporter {
case object Tick
case object StartReporting
def apply(container: Container, metrics: ContainerMetrics): Props =
Props(new MetricsReporter(container, metrics))
}

class MetricsReporter(container: Container, metrics: ContainerMetrics) extends Actor with ActorLogging {
import MetricsReporter._

private var ticker: Option[Cancellable] = None
private implicit val ec = context.dispatcher

override def postStop(): Unit =
ticker.map(_.cancel())


def receive = {
case Tick =>
log.info(s"Logging metrics for container ${container.jobId}")
log.info(s"Memory fail count: " + metrics.getContainerMemFailcount)
log.info("Memory limit for whole container: " + metrics.getMemoryLimit(container.jobId))
log.info("Memory Usage for whole container: " + metrics.getContainerMemoryUsage)
case StartReporting =>
ticker = startTicker()
case _ =>
}

private def startTicker(): Option[Cancellable] = {
Some(context.system.scheduler.schedule(
500.milliseconds,
500.milliseconds,
self,
Tick
))
}
}
Loading

0 comments on commit a37820d

Please sign in to comment.