This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 74bada0e01db [SPARK-57185][SQL] Use thread-local ICU collators to fix 
lock contention in CollationFactory
74bada0e01db is described below

commit 74bada0e01db0fcf1c8e9300508905a0952bd4aa
Author: Dejan Krakovic <[email protected]>
AuthorDate: Mon Jun 1 21:11:50 2026 +0800

    [SPARK-57185][SQL] Use thread-local ICU collators to fix lock contention in 
CollationFactory
    
    ### What changes were proposed in this pull request?
    
    Use thread-local `Collator` instances in 
`CollationSpecICU.buildCollation()` to eliminate lock contention on ICU's 
`RuleBasedCollator`. A frozen `RuleBasedCollator` serializes all threads 
through a `ReentrantLock` on its internal collation buffer (used by 
`getCollationKey`/`compare`), which causes a significant parallelism loss when 
many threads compare/hash collated strings concurrently.
    
    By creating independent per-thread instances via `Collator.getInstance()`, 
each thread operates on its own buffer without locking. Each instance is still 
frozen as a mutation guard. The `Collation.getCollator()` accessor now returns 
the current thread's instance (or `null` for non-ICU collations).
    
    ### Why are the changes needed?
    
    To remove a concurrency bottleneck when comparing or hashing collated 
columns under parallel access.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is purely a concurrency optimization; collation results are 
identical.
    
    ### How was this patch tested?
    
    Added a concurrent test in `CollationFactorySuite` that verifies 
`comparator`, `hashFunction`, and `getCollator()` produce consistent results 
under parallel access across `UNICODE`, `en`, `de`, `en_CI`, and `en_AI` 
collations. Existing `CollationFactorySuite` tests continue to pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes, co-authored using Claude code.
    
    Closes #56236 from dejankrak-db/thread-local-collator-fix-oss.
    
    Authored-by: Dejan Krakovic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 89092b956884ce0e2161f9fbcf1996a29e9d8c38)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/util/CollationFactory.java  | 35 ++++++++++++++--------
 .../spark/unsafe/types/CollationFactorySuite.scala | 31 +++++++++++++++++++
 2 files changed, 54 insertions(+), 12 deletions(-)

diff --git 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
index 3e2bfbcd87ca..8df59b1f6e34 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
@@ -110,7 +110,7 @@ public final class CollationFactory {
   public static class Collation {
     public final String collationName;
     public final String provider;
-    private final Collator collator;
+    private final ThreadLocal<Collator> threadLocalCollator;
     public final Comparator<UTF8String> comparator;
 
     /**
@@ -187,7 +187,7 @@ public final class CollationFactory {
     public Collation(
         String collationName,
         String provider,
-        Collator collator,
+        ThreadLocal<Collator> threadLocalCollator,
         Comparator<UTF8String> comparator,
         String version,
         Function<UTF8String, byte[]> sortKeyFunction,
@@ -197,7 +197,7 @@ public final class CollationFactory {
         boolean supportsSpaceTrimming) {
       this.collationName = collationName;
       this.provider = provider;
-      this.collator = collator;
+      this.threadLocalCollator = threadLocalCollator;
       this.comparator = comparator;
       this.version = version;
       this.sortKeyFunction = sortKeyFunction;
@@ -216,7 +216,7 @@ public final class CollationFactory {
     }
 
     public Collator getCollator() {
-      return collator;
+      return threadLocalCollator != null ? threadLocalCollator.get() : null;
     }
 
     /**
@@ -1016,29 +1016,40 @@ public final class CollationFactory {
           builder.setUnicodeLocaleKeyword("ks", "level1");
         }
         ULocale resultLocale = builder.build();
-        Collator collator = Collator.getInstance(resultLocale);
-        // Freeze ICU collator to ensure thread safety.
-        collator.freeze();
+
+        // Use thread-local Collator instances to avoid lock contention.
+        // A frozen RuleBasedCollator serializes all threads through a 
ReentrantLock on its
+        // internal collation buffer (used by getCollationKey/compare). By 
creating independent
+        // per-thread instances via Collator.getInstance(), each thread 
operates on its own
+        // buffer without locking. Each instance is frozen as a mutation guard 
so that any
+        // accidental call to setStrength() or similar throws immediately.
+        ThreadLocal<Collator> threadLocalCollator = ThreadLocal.withInitial(
+          () -> {
+            Collator collator = Collator.getInstance(resultLocale);
+            collator.freeze();
+            return collator;
+          });
 
         Comparator<UTF8String> comparator;
         Function<UTF8String, byte[]> sortKeyFunction;
 
         if (spaceTrimming == SpaceTrimming.NONE) {
           comparator = (s1, s2) ->
-            collator.compare(s1.toValidString(), s2.toValidString());
-          sortKeyFunction = s -> 
collator.getCollationKey(s.toValidString()).toByteArray();
+            threadLocalCollator.get().compare(s1.toValidString(), 
s2.toValidString());
+          sortKeyFunction = s ->
+            
threadLocalCollator.get().getCollationKey(s.toValidString()).toByteArray();
         } else {
-          comparator = (s1, s2) -> collator.compare(
+          comparator = (s1, s2) -> threadLocalCollator.get().compare(
             applyTrimmingPolicy(s1, spaceTrimming).toValidString(),
             applyTrimmingPolicy(s2, spaceTrimming).toValidString());
-          sortKeyFunction = s -> collator.getCollationKey(
+          sortKeyFunction = s -> threadLocalCollator.get().getCollationKey(
             applyTrimmingPolicy(s, 
spaceTrimming).toValidString()).toByteArray();
         }
 
         return new Collation(
           normalizedCollationName(),
           PROVIDER_ICU,
-          collator,
+          threadLocalCollator,
           comparator,
           ICU_VERSION,
           sortKeyFunction,
diff --git 
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
 
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
index ddf588b6c64c..87f1d0a1c75f 100644
--- 
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
+++ 
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
@@ -300,6 +300,37 @@ class CollationFactorySuite extends AnyFunSuite with 
Matchers { // scalastyle:ig
     })
   }
 
+  test("test concurrent comparator, sortKeyFunction, and getCollator on ICU 
collations") {
+    // Thread-local collator instances avoid lock contention on ICU's internal 
collation buffer.
+    // This test verifies correctness under concurrent access for all three 
paths:
+    // comparator, sortKeyFunction, and getCollator().
+    val collationNames = Seq("UNICODE", "en", "de", "en_CI", "en_AI")
+    collationNames.foreach { name =>
+      val collation = fetchCollation(name)
+      val s1 = toUTF8("apple")
+      val s2 = toUTF8("banana")
+      val expectedCmp = collation.comparator.compare(s1, s2)
+      val expectedKey = 
collation.sortKeyFunction.apply(s1).asInstanceOf[Array[Byte]]
+      val expectedCollatorKey =
+        collation.getCollator.getCollationKey(s1.toValidString()).toByteArray
+
+      (0 to 5).foreach(_ => {
+        IntStream.rangeClosed(0, 200).parallel().forEach { _ =>
+          val cmp = collation.comparator.compare(s1, s2)
+          assert(cmp == expectedCmp,
+            s"Comparator returned inconsistent result for $name")
+          val key = 
collation.sortKeyFunction.apply(s1).asInstanceOf[Array[Byte]]
+          assert(java.util.Arrays.equals(key, expectedKey),
+            s"sortKeyFunction returned inconsistent result for $name")
+          val collatorKey =
+            
collation.getCollator.getCollationKey(s1.toValidString()).toByteArray
+          assert(java.util.Arrays.equals(collatorKey, expectedCollatorKey),
+            s"getCollator().getCollationKey() returned inconsistent result for 
$name")
+        }
+      })
+    }
+  }
+
   test("test collation caching") {
     Seq(
       "UTF8_BINARY",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to