Skip to content

Commit

Permalink
修正batch.mlsql 进程不自动推出的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Jul 24, 2018
1 parent b6152d3 commit c05bd9f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ object LocalSparkApp {
"-streaming.platform", "spark",
"-streaming.enableHiveSupport", "true",
"-streaming.spark.service", "false",
"-streaming.enableCarbonDataSupport", "true",
"-streaming.enableCarbonDataSupport", "false",
"-streaming.carbondata.store", "/data/carbon/store",
"-streaming.carbondata.meta", "/data/carbon/meta",
"-streaming.job.file.path", "classpath:///test/batch-carbondata.json"
"-streaming.job.file.path", "classpath:///test/batch-mlsql.json"
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class MLSQLCompositor[T] extends Compositor[T] with CompositorHelper {
val context = new ScriptSQLExecListener(sparkSession(params), "", Map())
ScriptSQLExec.parse(_sql, context)
})

StreamingproJobManager.shutdown
if (middleResult == null) List() else middleResult
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ object StreamingproJobManager {
private[this] var _jobManager: StreamingproJobManager = _
private[this] val _executor = Executors.newFixedThreadPool(100)

def shutdown = {
_executor.shutdownNow()
_jobManager.shutdown
}

def init(sc: SparkContext, initialDelay: Long = 30, checkTimeInterval: Long = 5) = {
synchronized {
if (_jobManager == null) {
Expand Down Expand Up @@ -75,6 +80,7 @@ object StreamingproJobManager {
def killJob(groupId: String): Unit = {
_jobManager.cancelJobGroup(groupId)
}

private def handleJobDone(groupId: String): Unit = {
_jobManager.groupIdToStringproJobInfo.remove(groupId)

Expand Down Expand Up @@ -105,6 +111,10 @@ class StreamingproJobManager(sc: SparkContext, initialDelay: Long, checkTimeInte
sc.cancelJobGroup(groupId)
groupIdToStringproJobInfo.remove(groupId)
}

def shutdown = {
executor.shutdownNow()
}
}

case object StreamingproJobType {
Expand All @@ -114,10 +124,10 @@ case object StreamingproJobType {
}

case class StreamingproJobInfo(
owner: String,
jobType: String,
jobName: String,
jobContent: String,
groupId: String,
startTime: Long,
timeout: Long)
owner: String,
jobType: String,
jobName: String,
jobContent: String,
groupId: String,
startTime: Long,
timeout: Long)

0 comments on commit c05bd9f

Please sign in to comment.