Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
ADAPT1-1599 "Schema registry exponential backoff"
Browse files Browse the repository at this point in the history
  • Loading branch information
aadit-chugh committed Jul 5, 2024
1 parent 1a2c134 commit fb43b2f
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ object SchemaRegistry {
schemaRegistryClientRetries: Int,
schemaRegistryClientRetriesDelay: FiniteDuration): SchemaRegistry[F] =
new SchemaRegistry[F] {
val retryPolicy = limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay)
val retryPolicy = limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay)

private implicit class SchemaOps(sch: Schema) {
def fields: List[Schema.Field] = fieldsEval("topLevel", box = false).value
Expand Down Expand Up @@ -296,17 +296,24 @@ object SchemaRegistry {
): F[SchemaVersion] =
Sync[F].delay {
schemaRegistryClient.getVersion(subject, schema)
}.retryingOnAllErrors(retryPolicy, onFailure("getVersion"))
}.retryingOnSomeErrors(
isWorthRetrying = {
case r: RestClientException if r.getErrorCode == 40401 => false // Do not retry for RestClientException with error code 40401
case _ => true // Retry for all other exceptions
},
policy = retryPolicy,
onError = onFailure("getVersion", subject)
)

override def getAllVersions(subject: String): F[List[SchemaId]] =
Sync[F].fromTry(Try(schemaRegistryClient.getAllVersions(subject)))
.map(_.asScala.toList.map(_.toInt)).recover {
case r: RestClientException if r.getErrorCode == 40401 => List.empty
}.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions"))
}.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions", subject))

private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] =
private def onFailure(resourceTried: String, subject: String): (Throwable, RetryDetails) => F[Unit] =
(error, retryDetails) =>
Logger[F].info(s"Retrying due to failure in SchemaRegistry.$resourceTried: $error. RetryDetails: $retryDetails")
Logger[F].info(s"Retrying due to failure in SchemaRegistry.$resourceTried: $error. Subject: $subject. RetryDetails: $retryDetails")

override def getAllSubjects: F[List[String]] =
Sync[F].delay {
Expand Down

0 comments on commit fb43b2f

Please sign in to comment.