Skip to content

Commit

Permalink
[SPARK-46937][SQL] Revert "[] Improve concurrency performance for Fun…
Browse files Browse the repository at this point in the history
…ctionRegistry"

### What changes were proposed in this pull request?

Reverts apache#44976 as it breaks thread-safety

### Why are the changes needed?

Fix thread-safety

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#46940 from cloud-fan/revert.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
cloud-fan committed Jun 11, 2024
1 parent 6107836 commit 82a84ed
Showing 1 changed file with 28 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.sql.catalyst.analysis

import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.GuardedBy

import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.spark.SparkUnsupportedOperationException
Expand Down Expand Up @@ -195,8 +195,9 @@ object FunctionRegistryBase {

trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging {

@GuardedBy("this")
protected val functionBuilders =
new ConcurrentHashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]

// Resolution of the function name is always case insensitive, but the database name
// depends on the caller
Expand All @@ -219,36 +220,45 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging
def internalRegisterFunction(
name: FunctionIdentifier,
info: ExpressionInfo,
builder: FunctionBuilder): Unit = {
builder: FunctionBuilder): Unit = synchronized {
val newFunction = (info, builder)
functionBuilders.put(name, newFunction) match {
case previousFunction if previousFunction != null =>
case Some(previousFunction) if previousFunction != newFunction =>
logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " +
log"previously registered function.")
case _ =>
}
}

override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): T = {
val func = Option(functionBuilders.get(normalizeFuncName(name))).map(_._2).getOrElse {
throw QueryCompilationErrors.unresolvedRoutineError(name, Seq("system.builtin"))
val func = synchronized {
functionBuilders.get(normalizeFuncName(name)).map(_._2).getOrElse {
throw QueryCompilationErrors.unresolvedRoutineError(name, Seq("system.builtin"))
}
}
func(children)
}

override def listFunction(): Seq[FunctionIdentifier] =
functionBuilders.keys().asScala.toSeq
override def listFunction(): Seq[FunctionIdentifier] = synchronized {
functionBuilders.iterator.map(_._1).toList
}

override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] =
Option(functionBuilders.get(normalizeFuncName(name))).map(_._1)
override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = synchronized {
functionBuilders.get(normalizeFuncName(name)).map(_._1)
}

override def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder] =
Option(functionBuilders.get(normalizeFuncName(name))).map(_._2)
override def lookupFunctionBuilder(
name: FunctionIdentifier): Option[FunctionBuilder] = synchronized {
functionBuilders.get(normalizeFuncName(name)).map(_._2)
}

override def dropFunction(name: FunctionIdentifier): Boolean =
Option(functionBuilders.remove(normalizeFuncName(name))).isDefined
override def dropFunction(name: FunctionIdentifier): Boolean = synchronized {
functionBuilders.remove(normalizeFuncName(name)).isDefined
}

override def clear(): Unit = functionBuilders.clear()
override def clear(): Unit = synchronized {
functionBuilders.clear()
}
}

/**
Expand Down Expand Up @@ -298,11 +308,7 @@ class SimpleFunctionRegistry

override def clone(): SimpleFunctionRegistry = synchronized {
val registry = new SimpleFunctionRegistry
val iterator = functionBuilders.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val name = entry.getKey
val (info, builder) = entry.getValue
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
registry.internalRegisterFunction(name, info, builder)
}
registry
Expand Down Expand Up @@ -1030,11 +1036,7 @@ class SimpleTableFunctionRegistry extends SimpleFunctionRegistryBase[LogicalPlan

override def clone(): SimpleTableFunctionRegistry = synchronized {
val registry = new SimpleTableFunctionRegistry
val iterator = functionBuilders.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val name = entry.getKey
val (info, builder) = entry.getValue
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
registry.internalRegisterFunction(name, info, builder)
}
registry
Expand Down

0 comments on commit 82a84ed

Please sign in to comment.