Skip to content

Commit

Permalink
[add] 任务正常处理完成后,清除相关信息
Browse files Browse the repository at this point in the history
  • Loading branch information
cfmcgrady committed Mar 8, 2018
1 parent a47ea8f commit 3418b40
Showing 1 changed file with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package streaming.core

import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

/**
Expand Down Expand Up @@ -46,27 +45,32 @@ object StreamingproJobManager {
// }

def run(session: SparkSession, job: StreamingproJobInfo, f: () => Unit): Unit = {
if (_jobManager == null) {
f()
} else {
session.sparkContext.setJobGroup(job.groupId, job.jobName, true)
try {
try {
if (_jobManager == null) {
f()
} else {
session.sparkContext.setJobGroup(job.groupId, job.jobName, true)
_jobManager.groupIdToStringproJobInfo.put(job.groupId, job)
f()
}
finally {
session.sparkContext.clearJobGroup()
}
} finally {
handleJobDone(job.groupId)
session.sparkContext.clearJobGroup()
}
}

def asyncRun(session: SparkSession, job: StreamingproJobInfo, f: () => Unit) = {
// TODO: (fchen) 改成callback
_executor.execute(new Runnable {
override def run(): Unit = {
_jobManager.groupIdToStringproJobInfo.put(job.groupId, job)
session.sparkContext.setJobGroup(job.groupId, job.jobName, true)
f()
try {
_jobManager.groupIdToStringproJobInfo.put(job.groupId, job)
session.sparkContext.setJobGroup(job.groupId, job.jobName, true)
f()
} finally {
handleJobDone(job.groupId)
session.sparkContext.clearJobGroup()
}
}
})
}
Expand All @@ -87,6 +91,21 @@ object StreamingproJobManager {
def killJob(groupId: String): Unit = {
_jobManager.cancelJobGroup(groupId)
}
private def handleJobDone(groupId: String): Unit = {
_jobManager.groupIdToStringproJobInfo.remove(groupId)

}

// private class ASyncJobRunner(sparkSession: SparkSession,
// jobInfo: StreamingproJobInfo,
// function: () => Unit) extends Callable[String] {
// override def call(): String = {
// _jobManager.groupIdToStringproJobInfo.put(jobInfo.groupId, jobInfo)
// sparkSession.sparkContext.setJobGroup(jobInfo.groupId, jobInfo.jobName, true)
// function()
// jobInfo.groupId
// }
// }
}

class StreamingproJobManager(sc: SparkContext, initialDelay: Long, checkTimeInterval: Long) {
Expand Down

0 comments on commit 3418b40

Please sign in to comment.