Skip to content

Commit

Permalink
more explicit exception information (apache#420)
Browse files Browse the repository at this point in the history
* more explicit exception information
  • Loading branch information
wolfboys authored Nov 11, 2021
1 parent 0ebd39f commit febd368
Show file tree
Hide file tree
Showing 26 changed files with 203 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
package com.streamxhub.streamx.common.conf

import com.streamxhub.streamx.common.util.SystemPropertyUtils

import java.time.LocalDateTime
import java.util.regex.Pattern

object ConfigConst {
/**
Expand Down Expand Up @@ -312,7 +312,6 @@ object ConfigConst {
lazy val DOCKER_IMAGE_NAMESPACE: String = SystemPropertyUtils.get(KEY_DOCKER_IMAGE_NAMESPACE, DOCKER_IMAGE_NAMESPACE_DEFAULT)



def printLogo(info: String): Unit = {
println("\n\n .+. ")
println(" _____/ /_________ ____ _____ ___ _ __ ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ case class Workspace(storageType: StorageType) {
storageType match {
case StorageType.LFS =>
val path = SystemPropertyUtils.get(KEY_STREAMX_WORKSPACE_LOCAL)
require(path != null)
require(path != null, "[StreamX] streamx.workspace.local must not be null")
path
case StorageType.HDFS =>
SystemPropertyUtils.get(KEY_STREAMX_WORKSPACE_REMOTE) match {
case null =>
s"${HdfsUtils.getDefaultFS}$STREAMX_WORKSPACE_DEFAULT"
case p =>
require(p.startsWith("hdfs://"))
require(p.startsWith("hdfs://"), "")
val path = p.replaceFirst("^hdfs://((.*):\\d+/+|/+|)", "/")
s"${HdfsUtils.getDefaultFS}$path"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable {
lazy val fullVersion: String = s"${version}_$scalaVersion"

lazy val flinkLib: File = {
require(flinkHome != null)
require(flinkHome != null, "[StreamX] FlinkVersion: flinkHome must not be null.")
val lib = new File(s"$flinkHome/lib")
require(lib.exists() && lib.isDirectory)
require(lib.exists() && lib.isDirectory, s"[StreamX] FlinkVersion: $flinkHome/lib must be exists and isDirectory.")
lib
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@


import com.google.common.collect.Lists;

import java.io.Serializable;
import java.util.List;

Expand Down Expand Up @@ -110,7 +111,7 @@ public static boolean isKubernetesMode(Integer value) {
return isKubernetesMode(of(value));
}

public static List<Integer> getKubernetesMode(){
public static List<Integer> getKubernetesMode() {
return Lists.newArrayList(KUBERNETES_NATIVE_SESSION.getMode(), KUBERNETES_NATIVE_APPLICATION.getMode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object HdfsOperator extends FsOperator with Logger {
HdfsUtils.copyHdfsDir(toHdfsPath(srcPath), toHdfsPath(dstPath), delSrc = delSrc, overwrite = overwrite)

override def fileMd5(path: String): String = {
require(path != null && path.nonEmpty)
require(path != null && path.nonEmpty, "[StreamX] HdfsOperator.fileMd5: file must not be null.")
HdfsUtils.fileMd5(toHdfsPath(path))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object LFsOperator extends FsOperator with Logger {
if (!isAnyBank(srcPath, dstPath)) {
val srcFile = new File(srcPath)
val dstFile = new File(dstPath)
require(srcFile.exists(), "Source must be exists")
require(srcFile.exists(), "[StreamX] LFsOperator.move: Source must be exists")
if (srcFile.getCanonicalPath != dstFile.getCanonicalPath) {
if (dstFile.isDirectory) {
FileUtils.moveToDirectory(srcFile, dstFile, true)
Expand All @@ -86,8 +86,8 @@ object LFsOperator extends FsOperator with Logger {
if (!isAnyBank(srcPath, dstPath)) {
val srcFile = new File(srcPath)
val dstFile = new File(dstPath)
require(srcFile.exists(), "Source must be exists")
require(dstFile.exists(), "Destination must be exists")
require(srcFile.exists(), "[StreamX] LFsOperator.copy: Source must be exists")
require(dstFile.exists(), "[StreamX] LFsOperator.copy: Destination must be exists")
if (overwrite && srcFile.getCanonicalPath != dstFile.getCanonicalPath) {
if (dstFile.isDirectory) {
FileUtils.copyFileToDirectory(srcFile, dstFile)
Expand All @@ -102,15 +102,15 @@ object LFsOperator extends FsOperator with Logger {
if (!isAnyBank(srcPath, dstPath)) {
val srcFile = new File(srcPath)
val dstFile = new File(dstPath)
require(srcFile.exists(), "Source must be exists")
require(srcFile.exists(), "[StreamX] LFsOperator.copyDir: Source must be exists")
if (overwrite && srcFile.getCanonicalPath != dstFile.getCanonicalPath) {
FileUtils.copyDirectory(srcFile, dstFile)
}
}
}

override def fileMd5(path: String): String = {
require(path != null && path.nonEmpty)
require(path != null && path.nonEmpty, s"[StreamX] LFsOperator.fileMd5: file must not be null.")
DigestUtils.md5Hex(IOUtils.toByteArray(new FileInputStream(path)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ object ClassLoaderUtils extends Logger {

def loadJar(jarFilePath: String): Unit = {
val jarFile = new File(jarFilePath)
require(jarFile.exists, s"[StreamX] jarFilePath:$jarFilePath is not exists")
require(jarFile.isFile, s"[StreamX] jarFilePath:$jarFilePath is not file")
require(jarFile.exists, s"[StreamX] ClassLoaderUtils.loadJar: jarFilePath $jarFilePath is not exists")
require(jarFile.isFile, s"[StreamX] ClassLoaderUtils.loadJar: jarFilePath $jarFilePath is not file")
loadPath(jarFile.getAbsolutePath, List(".jar", ".zip"))
}

def loadJars(path: String): Unit = {
val jarDir = new File(path)
require(jarDir.exists, s"[StreamX] jarPath: $path is not exists")
require(jarDir.isDirectory, s"[StreamX] jarPath: $path is not directory")
require(jarDir.listFiles.length > 0, s"[StreamX] have not jar in path:$path")
require(jarDir.exists, s"[StreamX] ClassLoaderUtils.loadJars: jarPath $path is not exists")
require(jarDir.isDirectory, s"[StreamX] ClassLoaderUtils.loadJars: jarPath $path is not directory")
require(jarDir.listFiles.length > 0, s"[StreamX] ClassLoaderUtils.loadJars: have not jar in path:$path")
jarDir.listFiles.foreach { x =>
loadPath(x.getAbsolutePath, List(".jar", ".zip"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object CommandUtils extends Logger {

def execute(commands: JavaIter[String], consumer: Consumer[String]): Unit = {
Try {
require(commands != null && commands.nonEmpty)
require(commands != null && commands.nonEmpty, "[StreamX] CommandUtils.execute: commands must not be null.")
logDebug(s"Command execute:\n${commands.mkString("\n")} ")
val process = Utils.isWindows match {
case x if x => Runtime.getRuntime.exec("cmd /k ", null, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ object DependencyUtils {
val splits = p.replace("/", ":").split(":")
require(
splits.length == 3,
s"Provided Maven Coordinates must be in the form 'groupId:artifactId:version'. The coordinate provided is: $p"
s"[StreamX] DependencyUtils.extractMavenCoordinates: Provided Maven Coordinates must be in the form 'groupId:artifactId:version'. The coordinate provided is: $p"
)
require(
splits(0) != null && splits(0).trim.nonEmpty,
s"The groupId cannot be null or be whitespace. The groupId provided is: ${splits(0)}"
s"[StreamX] DependencyUtils.extractMavenCoordinates: The groupId cannot be null or be whitespace. The groupId provided is: ${splits(0)}"
)
require(
splits(1) != null && splits(1).trim.nonEmpty,
s"The artifactId cannot be null or be whitespace. The artifactId provided is: ${splits(1)}"
s"[StreamX] DependencyUtils.extractMavenCoordinates: The artifactId cannot be null or be whitespace. The artifactId provided is: ${splits(1)}"
)
require(
splits(2) != null && splits(2).trim.nonEmpty,
s"The version cannot be null or be whitespace. The version provided is: ${splits(2)}"
s"[StreamX] DependencyUtils.extractMavenCoordinates: The version cannot be null or be whitespace. The version provided is: ${splits(2)}"
)
MavenCoordinate(splits(0), splits(1), splits(2))
}
Expand Down Expand Up @@ -243,14 +243,14 @@ object DependencyUtils {
outCallback: Consumer[String]
): IvySettings = {
val file = new File(settingsFile)
require(file.exists(), s"Ivy settings file $file does not exist")
require(file.isFile, s"Ivy settings file $file is not a normal file")
require(file.exists(), s"[StreamX] DependencyUtils.loadIvySettings: Ivy settings file $file does not exist")
require(file.isFile, s"[StreamX] DependencyUtils.loadIvySettings: Ivy settings file $file is not a normal file")
val ivySettings: IvySettings = new IvySettings
try {
ivySettings.load(file)
} catch {
case e@(_: IOException | _: ParseException) =>
throw new RuntimeException(s"Failed when loading Ivy settings from $settingsFile", e)
throw new RuntimeException(s"DependencyUtils.loadIvySettings: Failed when loading Ivy settings from $settingsFile", e)
}
processIvyPathArg(ivySettings, ivyPath)
processRemoteRepoArg(ivySettings, remoteRepos, outCallback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ object FileUtils extends org.apache.commons.io.FileUtils {
return tempDir
}
}
throw new IllegalStateException(s"Failed to create directory within $TEMP_DIR_ATTEMPTS attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})")
throw new IllegalStateException(s"[StreamX] Failed to create directory within $TEMP_DIR_ATTEMPTS attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})")
}

def exists(path: String): Unit = {
require(path != null && path.nonEmpty && new File(path).exists(), s"file $path is not exist!")
require(path != null && path.nonEmpty && new File(path).exists(), s"[StreamX] FileUtils.exists: file $path is not exist!")
}

def getPathFromEnv(env: String): String = {
val path = System.getenv(env)
require(Utils.notEmpty(path), s"$env is not set on system env")
require(Utils.notEmpty(path), s"[StreamX] FileUtils.getPathFromEnv: $env is not set on system env")
val file = new File(path)
require(file.exists(), s"$env is not exist!")
require(file.exists(), s"[StreamX] FileUtils.getPathFromEnv: $env is not exist!")
file.getAbsolutePath
}

def resolvePath(parent: String, child: String): String = {
val file = new File(parent, child)
require(file.exists, s"${file.getAbsolutePath} is not exist!")
require(file.exists, s"[StreamX] FileUtils.resolvePath: ${file.getAbsolutePath} is not exist!")
file.getAbsolutePath
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object FlinkClientUtils {
* @return
*/
@throws[ProgramInvocationException] def getExecutionPlan(packagedProgram: PackagedProgram): String = {
require(packagedProgram != null)
require(packagedProgram != null, "[StreamX] FlinkClientUtils.getExecutionPlan: packagedProgram must not be null")
val address: InetAddress = InetAddress.getLocalHost
val jmAddress = new InetSocketAddress(address, new ServerSocket(0).getLocalPort)

Expand All @@ -52,7 +52,7 @@ object FlinkClientUtils {
val optimizer = new Optimizer(new DataStatistics, new DefaultCostEstimator, config)
val plan: Plan = PackagedProgramUtils.getPipelineFromProgram(packagedProgram, config, -1, true).asInstanceOf[Plan]
val optimizedPlan: OptimizedPlan = optimizer.compile(plan)
require(optimizedPlan != null)
require(optimizedPlan != null, "[StreamX] FlinkClientUtils.getExecutionPlan: optimizedPlan is null")

val dumper = new PlanJSONDumpGenerator
dumper.setEncodeForHTML(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ object HadoopUtils extends Logger {
rmId
}
}
require(activeRMId != null, "[StreamX] can not found yarn active node")
require(activeRMId != null, "[StreamX] HadoopUtils.getRMWebAppURL: can not found yarn active node")
logInfo(s"current activeRMHAId: $activeRMId")
val appActiveRMKey = HAUtil.addSuffix(addressPrefix, activeRMId)
val hostnameActiveRMKey = HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, activeRMId)
Expand Down Expand Up @@ -375,7 +375,7 @@ object HadoopUtils extends Logger {
}

def toApplicationId(appId: String): ApplicationId = {
require(appId != null)
require(appId != null, "[StreamX] HadoopUtils.toApplicationId: applicationId muse not be null")
val timestampAndId = appId.split("_")
ApplicationId.newInstance(timestampAndId(1).toLong, timestampAndId.last.toInt)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object HdfsUtils extends Logger {
*/
def create(fileName: String, content: String): Unit = {
val path: Path = getPath(fileName)
require(HadoopUtils.hdfs.exists(path), s"[StreamX] hdfs $fileName is exists!! ")
require(HadoopUtils.hdfs.exists(path), s"[StreamX] HdfsUtils.create $fileName is exists!! ")
val outputStream: FSDataOutputStream = HadoopUtils.hdfs.create(path)
outputStream.writeUTF(content)
outputStream.flush()
Expand All @@ -81,7 +81,7 @@ object HdfsUtils extends Logger {

def read(fileName: String): String = {
val path: Path = getPath(fileName)
require(HadoopUtils.hdfs.exists(path) && !HadoopUtils.hdfs.isDirectory(path), s"[StreamX] path:$fileName not exists or isDirectory ")
require(HadoopUtils.hdfs.exists(path) && !HadoopUtils.hdfs.isDirectory(path), s"[StreamX] HdfsUtils.read: path($fileName) not exists or isDirectory ")
val in = HadoopUtils.hdfs.open(path)
val out = new ByteArrayOutputStream()
IOUtils.copyBytes(in, out, 4096, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ object PropertiesUtils extends Logger {

def readFile(filename: String): String = {
val file = new File(filename)
require(file.exists(), s"file $file does not exist")
require(file.isFile, s"file $file is not a normal file")
require(file.exists(), s"[StreamX] readFile: file $file does not exist")
require(file.isFile, s"[StreamX] readFile: file $file is not a normal file")
val scanner = new Scanner(file)
val buffer = new StringBuilder
while (scanner.hasNextLine) {
Expand Down Expand Up @@ -94,8 +94,8 @@ object PropertiesUtils extends Logger {
/** Load Yaml present in the given file. */
def fromYamlFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Yaml file $file does not exist")
require(file.isFile, s"Yaml file $file is not a normal file")
require(file.exists(), s"[StreamX] fromYamlFile: Yaml file $file does not exist")
require(file.isFile, s"[StreamX] fromYamlFile: Yaml file $file is not a normal file")
val inputStream: InputStream = new FileInputStream(file)
try {
val map = MutableMap[String, String]()
Expand All @@ -113,8 +113,8 @@ object PropertiesUtils extends Logger {
/** Load properties present in the given file. */
def fromPropertiesFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile, s"Properties file $file is not a normal file")
require(file.exists(), s"[StreamX] fromPropertiesFile: Properties file $file does not exist")
require(file.isFile, s"[StreamX] fromPropertiesFile: Properties file $file is not a normal file")

val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
try {
Expand All @@ -130,7 +130,7 @@ object PropertiesUtils extends Logger {

/** Load Yaml present in the given file. */
def fromYamlFile(inputStream: InputStream): Map[String, String] = {
require(inputStream != null, s"Properties inputStream must not be null")
require(inputStream != null, s"[StreamX] fromYamlFile: Properties inputStream must not be null")
try {
val map = MutableMap[String, String]()
new Yaml()
Expand All @@ -146,13 +146,13 @@ object PropertiesUtils extends Logger {

/** Load properties present in the given file. */
def fromPropertiesFile(inputStream: InputStream): Map[String, String] = {
require(inputStream != null, s"Properties inputStream must not be null")
require(inputStream != null, s"[StreamX] fromPropertiesFile: Properties inputStream must not be null")
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().map(k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from inputStream", e)
case e: IOException => throw new IllegalArgumentException(s"[StreamX] Failed when loading properties from inputStream", e)
}
}

Expand All @@ -163,12 +163,12 @@ object PropertiesUtils extends Logger {
* @return
*/
def loadFlinkConfYaml(file: File): JavaMap[String, String] = {
require(file != null && file.exists() && file.isFile)
require(file != null && file.exists() && file.isFile, "[StreamX] loadFlinkConfYaml: file must not be null")
loadFlinkConfYaml(org.apache.commons.io.FileUtils.readFileToString(file))
}

def loadFlinkConfYaml(yaml: String): JavaMap[String, String] = {
require(yaml != null && yaml.nonEmpty)
require(yaml != null && yaml.nonEmpty, "[StreamX] loadFlinkConfYaml: yaml must not be null")
val flinkConf = new JavaMap[String, String]()
val scanner: Scanner = new Scanner(yaml)
val lineNo: AtomicInteger = new AtomicInteger(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

package com.streamxhub.streamx.common.util

import redis.clients.jedis.exceptions.JedisConnectionException
import redis.clients.jedis._
import redis.clients.jedis.exceptions.JedisConnectionException

import java.util.concurrent.ConcurrentHashMap
import scala.annotation.meta.getter
Expand All @@ -48,7 +48,7 @@ object RedisClient extends Logger {
*/
@tailrec
def connect(endpoints: Array[RedisEndpoint]): Jedis = {
require(endpoints.length > 0, "The RedisEndpoint array is empty!!!")
require(endpoints.length > 0, "[StreamX] The RedisEndpoint array is empty!!!")
val index = Random.nextInt().abs % endpoints.length
try {
connect(endpoints(index))
Expand Down Expand Up @@ -115,7 +115,7 @@ object RedisClient extends Logger {
}

def connectCluster(res: RedisEndpoint*): JedisCluster = {
require(res.nonEmpty, "The RedisEndpoint array is empty!!!")
require(res.nonEmpty, "[StreamX] The RedisEndpoint array is empty!!!")
val head = res.head
val cluster = clusters.getOrElseUpdate(head, {
val hostPorts = res.map(r => new HostAndPort(r.host, r.port)).toSet
Expand Down
Loading

0 comments on commit febd368

Please sign in to comment.