Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Apr 8, 2019
1 parent 71c3e1c commit f89ae82
Show file tree
Hide file tree
Showing 34 changed files with 349 additions and 299 deletions.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package org.apache.spark.sql.mlsql.session
import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.sql.SparkSession
import streaming.core.StreamingproJobManager
import streaming.log.Logging
import tech.mlsql.job.JobManager

/**
* Created by allwefantasy on 1/6/2018.
Expand Down Expand Up @@ -79,20 +79,25 @@ class SessionManager(rootSparkSession: SparkSession) extends Logging {
}
}

def getSessionOption(sessionIdentifier: SessionIdentifier): Option[MLSQLSession] = {
val session = getSession(sessionIdentifier)
if (session == null) None else Some(session)
}

def closeSession(sessionIdentifier: SessionIdentifier) {
val runningJobCnt = StreamingproJobManager.getJobInfo
val runningJobCnt = JobManager.getJobInfo
.filter(_._2.owner == sessionIdentifier.owner)
.size

if(runningJobCnt == 0){
if (runningJobCnt == 0) {
val session = identifierToSession.remove(sessionIdentifier)
if (session == null) {
throw new MLSQLException(s"Session $sessionIdentifier does not exist!")
}
val sessionUser = session.getUserName
SparkSessionCacheManager.get.decrease(sessionUser)
session.close()
}else{
} else {
SparkSessionCacheManager.get.visit(sessionIdentifier.owner)
log.info(s"Session can't close ,$runningJobCnt jobs are running")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ package org.apache.spark.sql.mlsql.session
* Created by allwefantasy on 3/6/2018.
*/

import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}

import scala.collection.JavaConverters._
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.spark.sql.SparkSession
import streaming.log.Logging

import scala.collection.JavaConverters._


class SparkSessionCacheManager() extends Logging {
private val cacheManager =
Expand All @@ -46,7 +47,7 @@ class SparkSessionCacheManager() extends Logging {

def set(user: String, sparkSession: SparkSession): Unit = {
userToSparkSession.put(user, (sparkSession, new AtomicInteger(1)))
userLatestVisit.put(user ,System.currentTimeMillis())
userLatestVisit.put(user, System.currentTimeMillis())
}

def getAndIncrease(user: String): Option[SparkSession] = {
Expand All @@ -71,7 +72,7 @@ class SparkSessionCacheManager() extends Logging {
}

def visit(user: String): Unit = {
userLatestVisit.put(user ,System.currentTimeMillis())
userLatestVisit.put(user, System.currentTimeMillis())
}

private[this] def removeSparkSession(user: String): Unit = {
Expand All @@ -82,11 +83,11 @@ class SparkSessionCacheManager() extends Logging {
override def run(): Unit = {
userToSparkSession.asScala.foreach {
case (user, (_, times)) if times.get() > 0 => {
if (userLatestVisit.getOrDefault(user ,Long.MaxValue) + SparkSessionCacheManager.getExpireTimeout
> System.currentTimeMillis()){
if (userLatestVisit.getOrDefault(user, Long.MaxValue) + SparkSessionCacheManager.getExpireTimeout
> System.currentTimeMillis()) {
log.debug(s"There are $times active connection(s) bound to the SparkSession instance" +
s" of $user ")
}else{
} else {
SparkSessionCacheManager.getSessionManager.closeSession(SessionIdentifier(user))
}
}
Expand Down Expand Up @@ -133,18 +134,20 @@ object SparkSessionCacheManager {
sparkSessionCacheManager.start()
}

def setSessionManager(manager: SessionManager): Unit ={
def setSessionManager(manager: SessionManager): Unit = {
sessionManager = manager
}

def getSessionManager: SessionManager = sessionManager

def getSessionManagerOption: Option[SessionManager] = if (sessionManager == null) None else Some(sessionManager)


def setExpireTimeout(expire :Long): String ={
if(expire > EXPIRE_SMALL_TIMEOUT){
def setExpireTimeout(expire: Long): String = {
if (expire > EXPIRE_SMALL_TIMEOUT) {
expireTimeout = expire
s"set session expire success $expire"
}else{
} else {
s"session expire must bigger than $EXPIRE_SMALL_TIMEOUT ,current is $expire"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package streaming.core.compositor.spark.transformation

import _root_.streaming.core.{CompositorHelper, StreamingproJobManager, StreamingproJobType}
import _root_.streaming.dsl.{ScriptSQLExec, ScriptSQLExecListener}
import java.util

import org.apache.log4j.Logger
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import java.util
import streaming.core.CompositorHelper
import streaming.dsl.{ScriptSQLExec, ScriptSQLExecListener}
import tech.mlsql.job.{JobManager, MLSQLJobType}

import scala.collection.JavaConversions._

Expand All @@ -47,20 +49,20 @@ class MLSQLCompositor[T] extends Compositor[T] with CompositorHelper {


override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
StreamingproJobManager.init(sparkSession(params))
JobManager.init(sparkSession(params))
require(sql.isDefined, "please set sql by variable `sql` in config file")

val _sql = translateSQL(sql.get, params)

val jobInfo = StreamingproJobManager.getStreamingproJobInfo(
"admin", StreamingproJobType.SCRIPT, "", _sql,
val jobInfo = JobManager.getJobInfo(
"admin", MLSQLJobType.SCRIPT, "", _sql,
-1L
)
StreamingproJobManager.run(sparkSession(params), jobInfo, () => {
JobManager.run(sparkSession(params), jobInfo, () => {
val context = new ScriptSQLExecListener(sparkSession(params), "", Map())
ScriptSQLExec.parse(_sql, context)
})
StreamingproJobManager.shutdown
JobManager.shutdown
if (middleResult == null) List() else middleResult
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package streaming.core.datasource.impl
import org.apache.spark.sql.mlsql.session.MLSQLException
import org.apache.spark.sql.{DataFrame, DataFrameReader}
import streaming.common.ScalaEnumTool
import streaming.core.StreamingproJobInfo
import streaming.core.datasource._
import streaming.core.datasource.util.MLSQLJobCollect
import streaming.dsl.ScriptSQLExec
import streaming.dsl.auth.{OperateType, TableType}
import streaming.dsl.load.batch.{LogTail, MLSQLAPIExplain, MLSQLConfExplain}
import tech.mlsql.job.MLSQLJobInfo

/**
* 2019-01-11 WilliamZhu([email protected])
Expand Down Expand Up @@ -39,7 +39,7 @@ class MLSQLSystemTables extends MLSQLSource with MLSQLSourceInfo with MLSQLRegis

}
case Array("jobs") =>
spark.createDataset[StreamingproJobInfo](jobCollect.jobs).toDF()
spark.createDataset[MLSQLJobInfo](jobCollect.jobs).toDF()
case Array("jobs", jobGroupId) =>
spark.createDataset(Seq(jobCollect.jobDetail(jobGroupId))).toDF()
case Array("progress", jobGroupId) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package streaming.core.datasource.util

import org.apache.spark.MLSQLResource
import org.apache.spark.sql.SparkSession
import streaming.core.StreamingproJobManager
import tech.mlsql.job.JobManager



/**
Expand All @@ -12,13 +13,13 @@ class MLSQLJobCollect(spark: SparkSession, owner: String) {
val resource = new MLSQLResource(spark, owner, getGroupId)

def jobs = {
val infoMap = StreamingproJobManager.getJobInfo
val infoMap = JobManager.getJobInfo
val data = infoMap.toSeq.map(_._2).filter(_.owner == owner)
data
}

def getGroupId(jobNameOrGroupId: String) = {
StreamingproJobManager.getJobInfo.filter(f => f._2.jobName == jobNameOrGroupId).headOption match {
JobManager.getJobInfo.filter(f => f._2.jobName == jobNameOrGroupId).headOption match {
case Some(item) => item._2.groupId
case None => jobNameOrGroupId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.{Map => JMap}

import _root_.streaming.common.{NetUtils, ScalaObjectReflect}
import _root_.streaming.core.StreamingproJobManager
import _root_.streaming.core.message.MLSQLMessage
import _root_.streaming.core.stream.MLSQLStreamManager
import _root_.streaming.dsl.mmlib.algs.bigdl.WowLoggerFilter
Expand All @@ -35,6 +34,7 @@ import org.apache.spark.ps.local.LocalPSSchedulerBackend
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.mlsql.session.{SessionIdentifier, SessionManager}
import org.apache.spark.sql.{MLSQLUtils, SQLContext, SparkSession}
import tech.mlsql.job.JobManager

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -164,7 +164,7 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo


if (MLSQLConf.MLSQL_SPARK_SERVICE.readFrom(configReader)) {
StreamingproJobManager.init(ss)
JobManager.init(ss)
}

// parameter server should be enabled by default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package streaming.core.stream

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQueryListener
import streaming.core.{StreamingproJobInfo, StreamingproJobManager, StreamingproJobType}
import streaming.log.{Logging, WowLog}
import tech.mlsql.job.{JobManager, MLSQLJobInfo, MLSQLJobType}

import scala.collection.JavaConverters._

/**
* 2019-01-21 WilliamZhu([email protected])
*/
object MLSQLStreamManager extends Logging with WowLog {
private val store = new java.util.concurrent.ConcurrentHashMap[String, StreamingproJobInfo]()
private val store = new java.util.concurrent.ConcurrentHashMap[String, MLSQLJobInfo]()

def addStore(job: StreamingproJobInfo) = {
def addStore(job: MLSQLJobInfo) = {
store.put(job.groupId, job)
}

Expand All @@ -40,19 +40,19 @@ class MLSQLStreamingQueryListener extends StreamingQueryListener with Logging wi
def sync(name: String, id: String) = {
// first we should check by name, since before the stream is really stared, we have record the name in
// StreamingproJobManager
StreamingproJobManager.getJobInfo.filter(f => f._2.jobType == StreamingproJobType.STREAM
JobManager.getJobInfo.filter(f => f._2.jobType == MLSQLJobType.STREAM
&& (f._2.jobName == name)).headOption match {
case Some(job) =>
if (job._2.groupId != id) {
logInfo(format(
s"""
|StreamingproJobManager:${job._2.jobName}
|JobManager:${job._2.jobName}
|Spark streams: ${name}
|Action: sync
|Reason:: Job is not synced before.
""".stripMargin))
//onQueryStarted is stared before we acquire info from StreamingQuery
StreamingproJobManager.addJobManually(job._2.copy(groupId = id))
JobManager.addJobManually(job._2.copy(groupId = id))
}
case None =>
// we only care when stream is restore from ck without MLSQL instance restart
Expand All @@ -61,12 +61,12 @@ class MLSQLStreamingQueryListener extends StreamingQueryListener with Logging wi
case Some(job) =>
logInfo(format(
s"""
|StreamingproJobManager:${job.jobName}
|JobManager:${job.jobName}
|Spark streams: ${name}
|Action: sync
|Reason:: Job is not in StreamingproJobManager but in MLSQLStreamManager.
|Reason:: Job is not in JobManager but in MLSQLStreamManager.
""".stripMargin))
StreamingproJobManager.addJobManually(job)
JobManager.addJobManually(job)
case None =>
// this should not happen,throw exception
throw new RuntimeException(s"MLSQL have unsync stream: ${name}")
Expand All @@ -85,10 +85,10 @@ class MLSQLStreamingQueryListener extends StreamingQueryListener with Logging wi

override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
MLSQLStreamManager.removeStore(event.id.toString)
StreamingproJobManager.getJobInfo.filter(f => f._2.jobType == StreamingproJobType.STREAM
JobManager.getJobInfo.filter(f => f._2.jobType == MLSQLJobType.STREAM
&& f._2.groupId == event.id.toString).headOption match {
case Some(job) =>
StreamingproJobManager.removeJobManually(job._1)
JobManager.removeJobManually(job._1)
case None =>
}
}
Expand Down
12 changes: 6 additions & 6 deletions streamingpro-mlsql/src/main/java/streaming/dsl/SaveAdaptor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SaveMode}
import streaming.core.datasource.{DataSinkConfig, DataSourceRegistry}
import streaming.core.stream.MLSQLStreamManager
import streaming.core.{StreamingproJobManager, StreamingproJobType}
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
import tech.mlsql.job.{JobManager, MLSQLJobType}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -95,12 +95,12 @@ class SaveAdaptor(scriptSQLExecListener: ScriptSQLExecListener) extends DslAdapt
val spark = oldDF.sparkSession
import spark.implicits._
val context = ScriptSQLExec.context()
var job = StreamingproJobManager.getJobInfo(context.groupId)
var job = JobManager.getJobInfo(context.groupId)


if (isStream) {
job = job.copy(jobType = StreamingproJobType.STREAM, jobName = scriptSQLExecListener.env()("streamName"))
StreamingproJobManager.addJobManually(job)
job = job.copy(jobType = MLSQLJobType.STREAM, jobName = scriptSQLExecListener.env()("streamName"))
JobManager.addJobManually(job)
}

var streamQuery: StreamingQuery = null
Expand Down Expand Up @@ -139,12 +139,12 @@ class SaveAdaptor(scriptSQLExecListener: ScriptSQLExecListener) extends DslAdapt
streamQuery = saveRes.asInstanceOf[StreamingQuery]
}

job = StreamingproJobManager.getJobInfo(context.groupId)
job = JobManager.getJobInfo(context.groupId)
if (streamQuery != null) {
//here we do not need to clean the original groupId, since the StreamingproJobManager.handleJobDone(job.groupId)
// will handle this. Also, if this is stream job, so it should be remove by the StreamManager if it fails
job = job.copy(groupId = streamQuery.id.toString)
StreamingproJobManager.addJobManually(job)
JobManager.addJobManually(job)
MLSQLStreamManager.addStore(job)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package streaming.dsl.mmlib.algs

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SparkSession}
import streaming.core.StreamingproJobManager
import streaming.core.datasource.util.MLSQLJobCollect
import streaming.dsl.mmlib.SQLAlg
import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}
import tech.mlsql.job.JobManager

/**
* 2019-01-11 WilliamZhu([email protected])
Expand All @@ -18,7 +18,7 @@ class SQLMLSQLJobExt(override val uid: String) extends SQLAlg with WowParams {
override def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = {
val spark = df.sparkSession
val groupId = new MLSQLJobCollect(spark, null).getGroupId(path)
StreamingproJobManager.killJob(groupId)
JobManager.killJob(spark, groupId)
import df.sparkSession.implicits._
Seq.empty[(String, String)].toDF("param", "description")

Expand Down
Loading

0 comments on commit f89ae82

Please sign in to comment.