Skip to content

Commit

Permalink
[LIVY-633][SERVER] session should not be gc-ed for long running queries
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
Currently, Livy records the last activity time of the session before statement execution. If a statement runs too long, exceeding then the session timeout, the session will be garbage collected after the statement execution.

This should not be the expected behavior. The statement execution time should not be count into idle. We should update the last activity time after the statement execution.

We cannot be updated when session changes state from busy to idle in the Session class. So in this patch, we add a replLastActivity field into the rscClient, which will be updated when the repl state changes. So when session changes its state from busy to idle, this field will catch the time and finally reflect on the session last activity.

## How was this patch tested?
Manual test. Also, add a new unit test.

Existing unit tests and integration tests.

Author: yihengwang <[email protected]>
Author: Yiheng Wang <[email protected]>

Closes apache#224 from yiheng/fix_633.
  • Loading branch information
yiheng authored and jerryshao committed Sep 17, 2019
1 parent a9b2ddb commit 145cc2b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 0 deletions.
7 changes: 7 additions & 0 deletions rsc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<!-- Or it will conflict with repl jars -->
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
Expand Down
18 changes: 18 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.livy.rsc.driver.AddFileJob;
import org.apache.livy.rsc.driver.AddJarJob;
import org.apache.livy.rsc.rpc.Rpc;
import org.apache.livy.sessions.SessionState;

import static org.apache.livy.rsc.RSCConf.Entry.*;

Expand All @@ -64,6 +65,8 @@ public class RSCClient implements LivyClient {
private Process driverProcess;
private volatile boolean isAlive;
private volatile String replState;
// Record the last activity timestamp of the repl
private volatile long replLastActivity = System.nanoTime();

RSCClient(RSCConf conf, Promise<ContextInfo> ctx, Process driverProcess) throws IOException {
this.conf = conf;
Expand Down Expand Up @@ -315,6 +318,16 @@ public String getReplState() {
return replState;
}

/**
* Get the timestamp of the last activity of the repl. It will be updated when the repl state
* changed from busy to idle
*
* @return last activity timestamp
*/
public long getReplLastActivity() {
return replLastActivity;
}

private class ClientProtocol extends BaseProtocol {

<T> JobHandleImpl<T> submit(Job<T> job) {
Expand Down Expand Up @@ -411,6 +424,11 @@ private void handle(ChannelHandlerContext ctx, JobStarted msg) {

private void handle(ChannelHandlerContext ctx, ReplState msg) {
LOG.trace("Received repl state for {}", msg.state);
// Update last activity timestamp when state change is from busy to idle.
if (SessionState.Busy$.MODULE$.state().equals(replState) && msg != null &&
SessionState.Idle$.MODULE$.state().equals(msg.state)) {
replLastActivity = System.nanoTime();
}
replState = msg.state;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,4 +626,16 @@ class InteractiveSession(
}

override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }

override def lastActivity: Long = {
val serverSideLastActivity = super.lastActivity
if (serverSideState == SessionState.Running) {
// If the rsc client is running, we compare the lastActivity of the session and the repl,
// and return the more latest one
client.flatMap { s => Option(s.getReplLastActivity) }.filter(_ > serverSideLastActivity)
.getOrElse(serverSideLastActivity)
} else {
serverSideLastActivity
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,21 @@ class InteractiveSessionSpec extends FunSpec
}
}

withSession("should refresh last activity time when statement finished") { session =>
val code =
"""
|from time import sleep
|sleep(3)
""".stripMargin
session.executeStatement(ExecuteRequest(code, None))
val executionBeginTime = session.lastActivity

eventually(timeout(10 seconds), interval(100 millis)) {
session.state should be(SessionState.Idle)
session.lastActivity should be > executionBeginTime
}
}

withSession("should error out the session if the interpreter dies") { session =>
session.executeStatement(ExecuteRequest("import os; os._exit(666)", None))
eventually(timeout(30 seconds), interval(100 millis)) {
Expand Down

0 comments on commit 145cc2b

Please sign in to comment.