Skip to content

Commit

Permalink
Migrate forced suspension mechanism from yield to Dispatcher.Default
Browse files Browse the repository at this point in the history
Since it's possible for certain dispatchers to completely avoid yielding, and currently the immediate dispatchers exhibit this behavior, we need an alternate mechanism of forcing suspension or UndeclaredThrowableExceptions will still be seen.

Retrofit does not have its own thread pool onto which we can defer resuming. Instead we rely to Dispatchers.Default and forcibly suspend the caller using low-level coroutine intrinsics.

(cherry picked from commit 9d683b7)
  • Loading branch information
JakeWharton committed Dec 9, 2019
1 parent 0a596fa commit 70e245e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 24 deletions.
4 changes: 2 additions & 2 deletions retrofit/src/main/java/retrofit2/HttpServiceMethod.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ static final class SuspendForResponse<ResponseT> extends HttpServiceMethod<Respo
try {
return KotlinExtensions.awaitResponse(call, continuation);
} catch (Exception e) {
return KotlinExtensions.yieldAndThrow(e, continuation);
return KotlinExtensions.suspendAndThrow(e, continuation);
}
}
}
Expand Down Expand Up @@ -206,7 +206,7 @@ static final class SuspendForBody<ResponseT> extends HttpServiceMethod<ResponseT
? KotlinExtensions.awaitNullable(call, continuation)
: KotlinExtensions.await(call, continuation);
} catch (Exception e) {
return KotlinExtensions.yieldAndThrow(e, continuation);
return KotlinExtensions.suspendAndThrow(e, continuation);
}
}
}
Expand Down
24 changes: 20 additions & 4 deletions retrofit/src/main/java/retrofit2/KotlinExtensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package retrofit2

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.yield
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.intercepted
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

Expand Down Expand Up @@ -97,7 +100,20 @@ suspend fun <T : Any> Call<T>.awaitResponse(): Response<T> {
}
}

internal suspend fun Exception.yieldAndThrow(): Nothing {
yield()
throw this
/**
* Force the calling coroutine to suspend before throwing [this].
*
* This is needed when a checked exception is synchronously caught in a [java.lang.reflect.Proxy]
* invocation to avoid being wrapped in [java.lang.reflect.UndeclaredThrowableException].
*
* The implementation is derived from:
* https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349
*/
internal suspend fun Exception.suspendAndThrow(): Nothing {
suspendCoroutineUninterceptedOrReturn<Nothing> { continuation ->
Dispatchers.Default.dispatch(continuation.context, Runnable {
continuation.intercepted().resumeWithException(this@suspendAndThrow)
})
COROUTINE_SUSPENDED
}
}
52 changes: 34 additions & 18 deletions retrofit/src/test/java/retrofit2/KotlinSuspendTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package retrofit2

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient
import okhttp3.mockwebserver.MockResponse
import okhttp3.mockwebserver.MockWebServer
Expand All @@ -35,6 +37,7 @@ import retrofit2.http.Path
import java.io.IOException
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import kotlin.coroutines.CoroutineContext

class KotlinSuspendTest {
@get:Rule val server = MockWebServer()
Expand Down Expand Up @@ -283,15 +286,19 @@ class KotlinSuspendTest {

server.shutdown()

// The problematic behavior of the UnknownHostException being synchronously thrown is
// probabilistic based on thread preemption. Running a thousand times will almost always
// trigger it, so we run an order of magnitude more to be safe.
repeat(10000) {
try {
example.body()
fail()
} catch (_: IOException) {
// We expect IOException, the bad behavior will wrap this in UndeclaredThrowableException.
// Run with a dispatcher that prevents yield from actually deferring work. An old workaround
// for this problem relied on yield, but it is not guaranteed to prevent direct execution.
withContext(DirectUnconfinedDispatcher) {
// The problematic behavior of the UnknownHostException being synchronously thrown is
// probabilistic based on thread preemption. Running a thousand times will almost always
// trigger it, so we run an order of magnitude more to be safe.
repeat(10000) {
try {
example.body()
fail()
} catch (_: IOException) {
// We expect IOException, the bad behavior will wrap this in UndeclaredThrowableException.
}
}
}
}
Expand All @@ -305,16 +312,25 @@ class KotlinSuspendTest {

server.shutdown()

// The problematic behavior of the UnknownHostException being synchronously thrown is
// probabilistic based on thread preemption. Running a thousand times will almost always
// trigger it, so we run an order of magnitude more to be safe.
repeat(10000) {
try {
example.response()
fail()
} catch (_: IOException) {
// We expect IOException, the bad behavior will wrap this in UndeclaredThrowableException.
// Run with a dispatcher that prevents yield from actually deferring work. An old workaround
// for this problem relied on yield, but it is not guaranteed to prevent direct execution.
withContext(DirectUnconfinedDispatcher) {
// The problematic behavior of the UnknownHostException being synchronously thrown is
// probabilistic based on thread preemption. Running a thousand times will almost always
// trigger it, so we run an order of magnitude more to be safe.
repeat(10000) {
try {
example.response()
fail()
} catch (_: IOException) {
// We expect IOException, the bad behavior will wrap this in UndeclaredThrowableException.
}
}
}
}

private object DirectUnconfinedDispatcher : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) = block.run()
}
}

0 comments on commit 70e245e

Please sign in to comment.