Skip to content

Commit

Permalink
Merge pull request byzer-org#541 from cfmcgrady/enhancePythonAlg
Browse files Browse the repository at this point in the history
python project support for SQLPythonAlg module
  • Loading branch information
cfmcgrady authored Sep 18, 2018
2 parents 0e5bf34 + 08f1e7a commit fac02f4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ class SQLExternalPythonAlg extends SQLPythonAlg {
}
}

// distribute python project

val loadPythonProject = params.contains("pythonProjectPath")
if (loadPythonProject) {
distributePythonProject(params).foreach(path => {
resourceParams += ("pythonProjectPath" -> path)
})
}

(Seq(""), metasTemp, params, selectedFitParam + ("resource" -> resourceParams.asJava))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class SQLPythonAlg extends SQLAlg with Functions {
recordSingleLineLog(kafkaParam, msg)
}
}
distributePythonProject(params)


val pythonScript = findPythonScript(userPythonScript, f, "sk")
Expand Down Expand Up @@ -313,6 +314,7 @@ class SQLPythonAlg extends SQLAlg with Functions {
val fitParam = arrayParamsWithIndex("fitParam", trainParams)
val selectedFitParam = fitParam(algIndex)._2
val loadResource = selectedFitParam.keys.map(_.split("\\.")(0)).toSet.contains("resource")
val loadPythonProject = trainParams.contains("pythonProjectPath")
var resourceParams = Map.empty[String, String]

val metas = wowMetas.map(f => f.getMap[String, String](0)).toSeq
Expand All @@ -331,6 +333,11 @@ class SQLPythonAlg extends SQLAlg with Functions {
distributeResource(sparkSession, resourcePath, tempResourceLocalPath)
}
}
if (loadPythonProject) {
distributePythonProject(trainParams).foreach(path => {
resourceParams += ("pythonProjectPath" -> path)
})
}
}

(models, metas, trainParams, selectedFitParam + ("resource" -> resourceParams.asJava))
Expand Down Expand Up @@ -413,4 +420,18 @@ class SQLPythonAlg extends SQLAlg with Functions {

UserDefinedFunction(f2, VectorType, Some(Seq(VectorType)))
}

def distributePythonProject(params: Map[String, String]): Option[String] = {
// load python project
val pythonProjectPath = params.get("pythonProjectPath")
if (pythonProjectPath.isDefined) {
val tempPythonProjectLocalPath = SQLPythonFunc.getLocalTempDataPath(pythonProjectPath.get)
logInfo(s"system load python project into directory: [ ${tempPythonProjectLocalPath} ].")
HDFSOperator.copyToLocalFile(tempPythonProjectLocalPath, pythonProjectPath.get, true)
logInfo(format("python project loaded!"))
Some(tempPythonProjectLocalPath)
} else {
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,9 @@ object ExternalCommandRunner extends Logging {
val err = proc.getErrorStream

try {
for (line <- Source.fromInputStream(err)(encoding).getLines) {
// scalastyle:off println
logCallback("__python__:" + line)
errorBuffer += line
// scalastyle:on println
}
val errorLog = logBuilder(Source.fromInputStream(err)(encoding).getLines())
logCallback(errorLog)
System.err.println(errorLog)
} catch {
case t: Throwable =>
childThreadException.set(t)
Expand Down Expand Up @@ -245,6 +242,15 @@ object ExternalCommandRunner extends Logging {
}
}
}
def logBuilder(iterator: Iterator[String]): String = {
val builder = StringBuilder.newBuilder
while (iterator.hasNext) {
val line = iterator.next()
builder.append(s"__python__:$line")
builder.append("\n")
}
builder.toString
}

}

Expand All @@ -254,4 +260,4 @@ class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter {
def accept(dir: File, name: String): Boolean = {
!name.equals(filterName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ object ExternalCommandRunner extends Logging {
val err = proc.getErrorStream

try {
logCallback(logBuilder(Source.fromInputStream(err)(encoding).getLines()))
val errorLog = logBuilder(Source.fromInputStream(err)(encoding).getLines())
logCallback(errorLog)
System.err.println(errorLog)
} catch {
case t: Throwable =>
childThreadException.set(t)
Expand Down

0 comments on commit fac02f4

Please sign in to comment.