Skip to content

Commit

Permalink
Change TaskRunner to limit context switches.
Browse files Browse the repository at this point in the history
Now we don't have to alternate between the coordinator thread and the task
thread between task runs if the task returns 0. Instead the task thread can
stay resident.

This implementation works by having task runnables that can switch from
the coordinator role (sleeping until the next task starts) and the executor
role.

square#5512
  • Loading branch information
squarejesse committed Oct 6, 2019
1 parent f40fa6d commit ef4b5ec
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,9 @@ class MockWebServer : ExternalResource(), Closeable {
val request = readRequest(stream)
atomicRequestCount.incrementAndGet()
requestQueue.add(request)
if (request.failure != null) {
return // Nothing to respond to.
}

val response: MockResponse = dispatcher.dispatch(request)

Expand Down
16 changes: 0 additions & 16 deletions okhttp/src/main/java/okhttp3/internal/concurrent/Task.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ abstract class Task(
/** Undefined unless this is in [TaskQueue.futureTasks]. */
internal var nextExecuteNanoTime = -1L

internal var runRunnable: Runnable? = null

/** Returns the delay in nanoseconds until the next execution, or -1L to not reschedule. */
abstract fun runOnce(): Long

Expand All @@ -66,19 +64,5 @@ abstract class Task(

check(this.queue === null) { "task is in multiple queues" }
this.queue = queue

this.runRunnable = Runnable {
val currentThread = Thread.currentThread()
val oldName = currentThread.name
currentThread.name = name

var delayNanos = -1L
try {
delayNanos = runOnce()
} finally {
queue.runCompleted(this, delayNanos)
currentThread.name = oldName
}
}
}
}
67 changes: 11 additions & 56 deletions okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,16 @@ import java.util.concurrent.TimeUnit
class TaskQueue internal constructor(
private val taskRunner: TaskRunner
) {
private var shutdown = false
internal var shutdown = false

/** This queue's currently-executing task, or null if none is currently executing. */
private var activeTask: Task? = null

/** True if the [activeTask] should not recur when it completes. */
private var cancelActiveTask = false
internal var activeTask: Task? = null

/** Scheduled tasks ordered by [Task.nextExecuteNanoTime]. */
private val futureTasks = mutableListOf<Task>()

internal fun isActive(): Boolean {
check(Thread.holdsLock(taskRunner))
internal val futureTasks = mutableListOf<Task>()

return activeTask != null || futureTasks.isNotEmpty()
}
/** True if the [activeTask] should be canceled when it completes. */
internal var cancelActiveTask = false

/**
* Returns a snapshot of tasks currently scheduled for execution. Does not include the
Expand Down Expand Up @@ -87,7 +81,7 @@ class TaskQueue internal constructor(
fun awaitIdle(delayNanos: Long): Boolean {
val latch = CountDownLatch(1)

val task = object : Task("awaitIdle") {
val task = object : Task("awaitIdle", cancelable = false) {
override fun runOnce(): Long {
latch.countDown()
return -1L
Expand All @@ -104,8 +98,8 @@ class TaskQueue internal constructor(
return latch.await(delayNanos, TimeUnit.NANOSECONDS)
}

/** Adds [task] to run in [delayNanos]. Returns true if the coordinator should run. */
private fun scheduleAndDecide(task: Task, delayNanos: Long): Boolean {
/** Adds [task] to run in [delayNanos]. Returns true if the coordinator is impacted. */
internal fun scheduleAndDecide(task: Task, delayNanos: Long): Boolean {
task.initQueue(this)

val now = taskRunner.backend.nanoTime()
Expand All @@ -124,7 +118,7 @@ class TaskQueue internal constructor(
if (insertAt == -1) insertAt = futureTasks.size
futureTasks.add(insertAt, task)

// Run the coordinator if we inserted at the front.
// Impact the coordinator if we inserted at the front.
return insertAt == 0
}

Expand Down Expand Up @@ -154,8 +148,8 @@ class TaskQueue internal constructor(
}
}

/** Returns true if the coordinator should run. */
private fun cancelAllAndDecide(): Boolean {
/** Returns true if the coordinator is impacted. */
internal fun cancelAllAndDecide(): Boolean {
if (activeTask != null && activeTask!!.cancelable) {
cancelActiveTask = true
}
Expand All @@ -169,43 +163,4 @@ class TaskQueue internal constructor(
}
return tasksCanceled
}

/**
* Posts the next available task to an executor for immediate execution.
*
* Returns the delay until the next call to this method, -1L for no further calls, or
* [Long.MAX_VALUE] to wait indefinitely.
*/
internal fun executeReadyTask(now: Long): Long {
check(Thread.holdsLock(taskRunner))

if (activeTask != null) return Long.MAX_VALUE // This queue is busy.

// Check if a task is immediately ready.
val runTask = futureTasks.firstOrNull() ?: return -1L
val delayNanos = runTask.nextExecuteNanoTime - now
if (delayNanos <= 0) {
activeTask = runTask
futureTasks.removeAt(0)
taskRunner.backend.executeTask(runTask.runRunnable!!)
return Long.MAX_VALUE // This queue is busy until the run completes.
}

// Wait until the next task is ready.
return delayNanos
}

internal fun runCompleted(task: Task, delayNanos: Long) {
synchronized(taskRunner) {
check(activeTask === task)

if (delayNanos != -1L && !cancelActiveTask && !shutdown) {
scheduleAndDecide(task, delayNanos)
}

activeTask = null
cancelActiveTask = false
taskRunner.kickCoordinator(this)
}
}
}
Loading

0 comments on commit ef4b5ec

Please sign in to comment.