Skip to content

Commit

Permalink
[add] 添加job相关信息(包括异步和同步任务)
Browse files Browse the repository at this point in the history
  • Loading branch information
cfmcgrady committed Mar 8, 2018
1 parent c5d6106 commit d11eb50
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package streaming.core.strategy.platform

import java.lang.reflect.Modifier
import java.util.{Map => JMap}
import java.util.concurrent.atomic.AtomicReference
import java.util.logging.Logger
import java.util.{Map => JMap}

import org.apache.spark.ps.cluster.{PSDriverBackend, PSExecutorBackend}
import scala.collection.JavaConversions._

import org.apache.spark.{SparkConf, SparkRuntimeOperator}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ps.cluster.PSDriverBackend
import org.apache.spark.ps.local.LocalPSSchedulerBackend
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.util.Utils
import streaming.core.JobCanceller
import streaming.dsl.mmlib.algs.SQLDL4J

import scala.collection.JavaConversions._
import streaming.core.StreamingproJobManager

/**
* Created by allwefantasy on 30/3/2017.
Expand Down Expand Up @@ -86,7 +84,7 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo
}

if (params.containsKey("streaming.job.cancel") && params.get("streaming.job.cancel").toString.toBoolean) {
JobCanceller.init(ss.sparkContext)
StreamingproJobManager.init(ss.sparkContext)
}

if (params.containsKey("streaming.ps.enable") && params.get("streaming.ps.enable").toString.toBoolean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package streaming.rest

import java.lang.reflect.Modifier

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

import net.csdn.annotation.rest.At
import net.csdn.common.collections.WowCollections
import net.csdn.common.path.Url
Expand All @@ -11,12 +14,10 @@ import net.csdn.modules.transport.HttpTransportService
import org.apache.spark.ps.cluster.Message
import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode}
import streaming.common.JarUtil
import streaming.core.{AsyncJobRunner, DownloadRunner, JobCanceller}
import streaming.core._
import streaming.core.strategy.platform.{PlatformManager, SparkRuntime}
import streaming.dsl.{ScriptSQLExec, ScriptSQLExecListener}

import scala.collection.JavaConversions._

/**
* Created by allwefantasy on 28/3/2017.
*/
Expand Down Expand Up @@ -63,14 +64,18 @@ class RestController extends ApplicationController {

@At(path = Array("/run/script"), types = Array(GET, POST))
def script = {
val sparkSession = runtime.asInstanceOf[SparkRuntime].sparkSession
val sparkSession= runtime.asInstanceOf[SparkRuntime].sparkSession
val htp = findService(classOf[HttpTransportService])
if (paramAsBoolean("async", false) && !params().containsKey("callback")) {
render(400, "when async is set true ,then you should set callback url")
}
try {
val jobInfo = StreamingproJobManager.getStreamingproJobInfo(
param("owner"), StreamingproJobType.SQL, param("jobName"), param("sql"),
paramAsLong("timeout", 30000)
)
if (paramAsBoolean("async", false)) {
AsyncJobRunner.run(() => {
StreamingproJobManager.asyncRun(sparkSession, jobInfo, () => {
try {
ScriptSQLExec.parse(param("sql"), new ScriptSQLExecListener(sparkSession, param("prefixPath")))
htp.get(new Url(param("callback")), Map("stat" -> s"""success"""))
Expand All @@ -81,7 +86,9 @@ class RestController extends ApplicationController {
}
})
} else {
ScriptSQLExec.parse(param("sql"), new ScriptSQLExecListener(sparkSession, param("prefixPath")))
StreamingproJobManager.asyncRun(sparkSession, jobInfo, () => {
ScriptSQLExec.parse(param("sql"), new ScriptSQLExecListener(sparkSession, param("prefixPath")))
})
}

} catch {
Expand All @@ -95,15 +102,21 @@ class RestController extends ApplicationController {

@At(path = Array("/run/sql"), types = Array(GET, POST))
def ddlSql = {
val sparkSession = runtime.asInstanceOf[SparkRuntime].sparkSession
val sparkRuntime = runtime.asInstanceOf[SparkRuntime]
val sparkSession = sparkRuntime.sparkSession
val path = param("path", "-")
if (paramAsBoolean("async", false) && path == "-") {
render(s"""path should not be empty""")
}

val jobInfo = StreamingproJobManager.getStreamingproJobInfo(
param("owner"), StreamingproJobType.SQL, param("jobName"), param("sql"),
paramAsLong("timeout", 30000)
)

paramAsBoolean("async", false).toString match {
case "true" if hasParam("callback") =>
AsyncJobRunner.run(() => {
StreamingproJobManager.run(sparkSession, jobInfo, () => {
val dfWriter = sparkSession.sql(param("sql")).write.mode(SaveMode.Overwrite)
_save(dfWriter)
val htp = findService(classOf[HttpTransportService])
Expand All @@ -113,14 +126,14 @@ class RestController extends ApplicationController {
render("[]")

case "false" if param("resultType", "") == "file" =>
JobCanceller.runWithGroup(sparkSession.sparkContext, paramAsLong("timeout", 30000), () => {
StreamingproJobManager.run(sparkSession, jobInfo, () => {
val dfWriter = sparkSession.sql(param("sql")).write.mode(SaveMode.Overwrite)
_save(dfWriter)
render(s"""/download?fileType=raw&file_suffix=${param("format", "csv")}&paths=$path""")
})

case "false" if param("resultType", "") != "file" =>
JobCanceller.runWithGroup(sparkSession.sparkContext, paramAsLong("timeout", 30000), () => {
StreamingproJobManager.run(sparkSession, jobInfo, () => {
var res = ""
try {
res = sparkSession.sql(param("sql")).toJSON.collect().mkString(",")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
public class DownloadRunner {

private static Logger logger = Logger.getLogger(JobCanceller.class);
private static Logger logger = Logger.getLogger(DownloadRunner.class);

public static int getTarFileByPath(HttpServletResponse res, String pathStr) {
String[] paths = pathStr.split(",");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package streaming.core

import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.SparkSession

/**
* Created by allwefantasy on 15/5/2017.
*/
object StreamingproJobManager {
val logger = Logger.getLogger(classOf[StreamingproJobManager])
private[this] var _jobManager: StreamingproJobManager = _
private[this] val _executor = Executors.newFixedThreadPool(100)

def init(sc: SparkContext, initialDelay: Long = 30, checkTimeInterval: Long = 5) = {
synchronized {
if (_jobManager == null) {
logger.info(s"JobCanceller Timer started with initialDelay=${initialDelay} checkTimeInterval=${checkTimeInterval}")
_jobManager = new StreamingproJobManager(sc, initialDelay, checkTimeInterval)
_jobManager.run
}
}
}

// def runWithGroup(sc: SparkContext, timeout: Long, f: () => Unit) = {
// if (_jobManager == null) {
// f()
// } else {
// val groupId = _jobManager.nextGroupId.incrementAndGet().toString
// sc.setJobGroup(groupId, "", true)
// try {
// _jobManager.groupIdToTime.put(groupId, JobTime(System.currentTimeMillis(), timeout))
// f()
// }
// finally {
// sc.clearJobGroup()
// }
// }
// }

def run(session: SparkSession, job: StreamingproJobInfo, f: () => Unit): Unit = {
if (_jobManager == null) {
f()
} else {
session.sparkContext.setJobGroup(job.groupId, job.jobName, true)
try {
_jobManager.groupIdToStringproJobInfo.put(job.groupId, job)
f()
}
finally {
session.sparkContext.clearJobGroup()
}
}
}

def asyncRun(session: SparkSession, job: StreamingproJobInfo, f: () => Unit) = {
_executor.execute(new Runnable {
override def run(): Unit = {
_jobManager.groupIdToStringproJobInfo.put(job.groupId, job)
session.sparkContext.setJobGroup(job.groupId, job.jobName, true)
f()
}
})
}

def getStreamingproJobInfo(owner: String,
jobType: String,
jobName: String,
jobContent: String,
timeout: Long): StreamingproJobInfo = {
val startTime = System.currentTimeMillis()
val groupId = _jobManager.nextGroupId.incrementAndGet().toString
StreamingproJobInfo(owner, jobType, jobName, jobContent, groupId, startTime, timeout)
}
}

class StreamingproJobManager(sc: SparkContext, initialDelay: Long, checkTimeInterval: Long) {
val groupIdToStringproJobInfo = new ConcurrentHashMap[String, StreamingproJobInfo]()
val nextGroupId = new AtomicInteger(0)
val logger = Logger.getLogger(classOf[StreamingproJobManager])
val executor = Executors.newSingleThreadScheduledExecutor()

def run = {
executor.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = {
val items = new ArrayBuffer[String]()
groupIdToStringproJobInfo.foreach { f =>
val elapseTime = System.currentTimeMillis() - f._2.startTime
if (elapseTime >= f._2.timeout) {
items += f._1
cancelJobGroup(f._1)
}
}

items.foreach(f => groupIdToStringproJobInfo.remove(f))
}
}, initialDelay, checkTimeInterval, TimeUnit.SECONDS)
}

def cancelJobGroup(groupId: String): Unit = {
logger.info("JobCanceller Timer cancel job group " + groupId)
sc.cancelJobGroup(groupId)
}
}

case object StreamingproJobType {
val SCRIPT = "script"
val SQL = "sql"
}

case class StreamingproJobInfo(
owner: String,
jobType: String,
jobName: String,
jobContent: String,
groupId: String,
startTime: Long,
timeout: Long)

0 comments on commit d11eb50

Please sign in to comment.