Skip to content

Commit

Permalink
[FLINK-6655] Add validateAndNormalizeUri method to MemoryArchivist
Browse files Browse the repository at this point in the history
This closes apache#4156.
  • Loading branch information
zhangminglei authored and zentol committed Jul 1, 2017
1 parent 3f0ac26 commit 3956269
Showing 1 changed file with 55 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.flink.runtime.jobmanager

import java.io.IOException
import java.net.URI
import java.util

import akka.actor.ActorRef
import grizzled.slf4j.Logger
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.Path
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.webmonitor.WebMonitorUtils
Expand All @@ -34,7 +35,6 @@ import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, Executio
import org.apache.flink.runtime.history.FsJobArchivist
import org.apache.flink.runtime.messages.ArchiveMessages._
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.state.filesystem.FsStateBackend

import scala.collection.mutable
import scala.concurrent.future
Expand Down Expand Up @@ -86,7 +86,7 @@ class MemoryArchivist(
}

override def handleMessage: Receive = {

/* Receive Execution Graph to archive */
case ArchiveExecutionGraph(jobID, graph) =>
// Keep lru order in case we override a graph (from multiple job submission in one session).
Expand All @@ -109,7 +109,7 @@ class MemoryArchivist(
trimHistory()

case msg : InfoMessage => handleWebServerInfoMessage(msg, sender())

case RequestArchivedJob(jobID: JobID) =>
val graph = graphs.get(jobID)
sender ! decorateMessage(ArchivedJob(graph))
Expand Down Expand Up @@ -165,7 +165,7 @@ class MemoryArchivist(
throw new RuntimeException("Received unknown message " + message)
}


private def handleWebServerInfoMessage(message: InfoMessage, theSender: ActorRef): Unit = {
message match {
case _ : RequestJobsOverview =>
Expand All @@ -175,7 +175,7 @@ class MemoryArchivist(
catch {
case t: Throwable => log.error("Exception while creating the jobs overview", t)
}

case _ : RequestJobsWithIDsOverview =>
try {
sender ! decorateMessage(createJobsWithIDsOverview())
Expand All @@ -188,7 +188,7 @@ class MemoryArchivist(
val details = graphs.values.map {
v => WebMonitorUtils.createDetailsForJob(v)
}.toArray[JobDetails]

theSender ! decorateMessage(new MultipleJobsDetails(null, details))
}
}
Expand All @@ -198,7 +198,7 @@ class MemoryArchivist(
// so we aren't archiving it yet.
if (archivePath.isDefined && graph.getState.isGloballyTerminalState) {
try {
val p = FsStateBackend.validateAndNormalizeUri(archivePath.get.toUri)
val p = validateAndNormalizeUri(archivePath.get.toUri)
future {
try {
FsJobArchivist.archiveJob(p, graph)
Expand All @@ -217,7 +217,7 @@ class MemoryArchivist(
// --------------------------------------------------------------------------
// Request Responses
// --------------------------------------------------------------------------

private def createJobsOverview() : JobsOverview = {
new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
}
Expand All @@ -239,7 +239,7 @@ class MemoryArchivist(

new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
}

// --------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------
Expand All @@ -255,4 +255,48 @@ class MemoryArchivist(
graphs.remove(jobID)
}
}

/**
* Checks and normalizes the archive path URI. This method first checks the validity of the
* URI (scheme, path, availability of a matching file system) and then normalizes the URL
* to a path.
*
* If the URI does not include an authority, but the file system configured for the URI has an
* authority, then the normalized path will include this authority.
*
* @param archivePathUri The URI to check and normalize.
* @return a normalized URI as a Path.
*
* @throws IllegalArgumentException Thrown, if the URI misses schema or path.
* @throws IOException Thrown, if no file system can be found for the URI's scheme.
*/
@throws[IOException]
private def validateAndNormalizeUri(archivePathUri: URI): Path = {
val scheme = archivePathUri.getScheme
val path = archivePathUri.getPath

// some validity checks
if (scheme == null) {
throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
"Please specify the file system scheme explicitly in the URI: " + archivePathUri)
}

if (path == null) {
throw new IllegalArgumentException("The path to store the job archives is null. " +
"Please specify a directory path for storing job archives. and the URI is: " +
archivePathUri)
}

if (path.length == 0 || path == "/") {
throw new IllegalArgumentException("Cannot use the root directory for storing job archives.")
}

if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
// skip verification checks for non-flink supported filesystem
// this is because the required filesystem classes may not be available to the flink client
throw new IllegalArgumentException("No file system found with scheme " + scheme
+ ", referenced in file URI '" + archivePathUri.toString + "'.")
}
new Path(archivePathUri)
}
}

0 comments on commit 3956269

Please sign in to comment.