Skip to content

Commit

Permalink
[CELEBORN-847] Support use RESTful API to trigger worker exit and exi…
Browse files Browse the repository at this point in the history
…tImmediately

### What changes were proposed in this pull request?
As title

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes apache#1768 from AngersZhuuuu/CELEBORN-847.

Lead-authored-by: Angerszhuuuu <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
4 people committed Aug 15, 2023
1 parent 4a4a37e commit 17de300
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,4 @@ API path listed as below:
| /unavailablePeers | List the unavailable peers of the worker, this always means the worker connect to the peer failed. |
| /isShutdown | Show if the worker is during the process of shutdown. |
| /isRegistered | Show if the worker is registered to the master success. |
| /decommission | Trigger this worker to decommission from the cluster |
| /exit?type=${TYPE} | Trigger this worker to exit. Legal `type`s are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY' |
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ abstract class HttpService extends Service with Logging {

def isRegistered: String

def decommission: String = throw new UnsupportedOperationException()
def exit(exitType: String): String = throw new UnsupportedOperationException()

def startHttpServer(): Unit = {
val handlers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class HttpRequestHandler(
}

def handleRequest(uri: String): String = {
uri match {
val (path, parameters) = HttpUtils.parseUrl(uri)
path match {
case "/conf" =>
service.getConf
case "/workerInfo" =>
Expand Down Expand Up @@ -90,8 +91,8 @@ class HttpRequestHandler(
service.isShutdown
case "/isRegistered" if service.serviceName == Service.WORKER =>
service.isRegistered
case "/decommission" if service.serviceName == Service.WORKER =>
service.decommission
case "/exit" if service.serviceName == Service.WORKER =>
service.exit(parameters.getOrElse("TYPE", ""))
case _ => INVALID
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.server.common.http

import java.net.URL
import java.util.Locale

object HttpUtils {
def parseUrl(uri: String): (String, Map[String, String]) = {
val url = new URL(s"https://127.0.0.1:9000$uri")
val parameter =
if (url.getQuery == null) {
Map.empty[String, String]
} else {
url.getQuery
.split("&")
.map(_.split("="))
.map(arr => arr(0).toUpperCase(Locale.ROOT) -> arr(1).toUpperCase(Locale.ROOT)).toMap
}
(url.getPath, parameter)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.server.common.http

import org.scalatest.funsuite.AnyFunSuite

import org.apache.celeborn.common.internal.Logging

class HttpUtilsSuite extends AnyFunSuite with Logging {

def checkParseUri(
uri: String,
expectPath: String,
expectParameters: Map[String, String]): Unit = {
val (path, parameters) = HttpUtils.parseUrl(uri)
assert(path == expectPath)
assert(parameters == expectParameters)
}

test("CELEBORN-847: Support parse HTTP Restful API parameters") {
checkParseUri("/exit", "/exit", Map.empty)
checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" -> "DECOMMISSION"))
checkParseUri(
"/exit?type=decommission&foo=a",
"/exit",
Map("TYPE" -> "DECOMMISSION", "FOO" -> "A"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -561,17 +561,26 @@ private[celeborn] class Worker(
sb.toString()
}

override def decommission: String = {
exitKind = CelebornExitKind.WORKER_DECOMMISSION
override def exit(exitType: String): String = {
exitType match {
case "DECOMMISSION" =>
exitKind = CelebornExitKind.WORKER_DECOMMISSION
case "GRACEFUL" =>
exitKind = CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN
case "IMMEDIATELY" =>
exitKind = CelebornExitKind.EXIT_IMMEDIATELY
case _ => // Use origin code
}
// Use the original EXIT_CODE
new Thread() {
override def run(): Unit = {
Thread.sleep(10000)
System.exit(0)
}
}.start()
val sb = new StringBuilder
sb.append("======================== Decommission Worker =========================\n")
sb.append("Decommission worker triggered: \n")
sb.append("============================ Exit Worker =============================\n")
sb.append(s"Exit worker by $exitType triggered: \n")
sb.append(workerInfo.toString()).append("\n")
sb.toString()
}
Expand Down

0 comments on commit 17de300

Please sign in to comment.