Skip to content

Commit

Permalink
[CELEBORN-1135] Added tests for the RpcEnv and related classes
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Added test suites for `RpcEnv`, `NettyRpcEnv`, and other related classes.
These are copied over from Apache Spark. Some of the UTs in Apache Spark required changes in the source code like [SPARK-39468](https://issues.apache.org/jira/browse/SPARK-39468) which I didn't copy over.

### Why are the changes needed?
The change adds unit tests.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Just adds UTs. The source code changes are minimal.

Closes apache#2107 from otterc/CELEBORN-1135.

Authored-by: Chandni Singh <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
otterc authored and RexXiong committed Nov 24, 2023
1 parent 6f32838 commit 788b0c3
Show file tree
Hide file tree
Showing 8 changed files with 1,458 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[celeborn] class Inbox(
}
}
while (true) {
safelyCall(endpoint) {
safelyCall(endpoint, endpointRef.name) {
message match {
case RpcMessage(_sender, content, context) =>
try {
Expand Down Expand Up @@ -218,7 +218,21 @@ private[celeborn] class Inbox(
/**
* Calls action closure, and calls the endpoint's onError function in the case of exceptions.
*/
private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {
private def safelyCall(
endpoint: RpcEndpoint,
endpointRefName: String)(action: => Unit): Unit = {
def dealWithFatalError(fatal: Throwable): Unit = {
inbox.synchronized {
assert(numActiveThreads > 0, "The number of active threads should be positive.")
// Should reduce the number of active threads before throw the error.
numActiveThreads -= 1
}
logError(
s"An error happened while processing message in the inbox for $endpointRefName",
fatal)
throw fatal
}

try action
catch {
case NonFatal(e) =>
Expand All @@ -230,8 +244,18 @@ private[celeborn] class Inbox(
} else {
logError("Ignoring error", ee)
}
case fatal: Throwable =>
dealWithFatalError(fatal)
}
case fatal: Throwable =>
dealWithFatalError(fatal)
}
}

// exposed only for testing
def getNumActiveThreads: Int = {
inbox.synchronized {
inbox.numActiveThreads
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.common.rpc

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.exception.CelebornException

class RpcAddressSuite extends CelebornFunSuite {

test("hostPort") {
val address = RpcAddress("1.2.3.4", 1234)
assert(address.host === "1.2.3.4")
assert(address.port === 1234)
assert(address.hostPort === "1.2.3.4:1234")
}

test("fromCelebornURL") {
val address = RpcAddress.fromCelebornURL("celeborn://1.2.3.4:1234")
assert(address.host === "1.2.3.4")
assert(address.port === 1234)
}

test("fromCelebornURL: a typo url") {
val e = intercept[CelebornException] {
RpcAddress.fromCelebornURL("celeborn://1.2. 3.4:1234")
}
assert("Invalid master URL: celeborn://1.2. 3.4:1234" === e.getMessage)
}

test("fromCelebornURL: invalid scheme") {
val e = intercept[CelebornException] {
RpcAddress.fromCelebornURL("invalid://1.2.3.4:1234")
}
assert("Invalid master URL: invalid://1.2.3.4:1234" === e.getMessage)
}

test("toCelebornURL") {
val address = RpcAddress("1.2.3.4", 1234)
assert(address.toCelebornURL === "celeborn://1.2.3.4:1234")
}

}
Loading

0 comments on commit 788b0c3

Please sign in to comment.