This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3f5adada5e71 [SPARK-50558][SQL] Introduce simpleString for 
ExpressionSet
3f5adada5e71 is described below

commit 3f5adada5e7118e7fbf8135d4cb5f81df66f060b
Author: Ole Sasse <[email protected]>
AuthorDate: Mon Jan 27 10:29:52 2025 +0300

    [SPARK-50558][SQL] Introduce simpleString for ExpressionSet
    
    ### What changes were proposed in this pull request?
    
    * Introduce a simpleString method equal to the one for Expression and add 
it to ExpressionSet
    * Use it for push down filter logging in DataSourceStrategy
    * Use if for after scan filter logging in FileSourceStrategy
    
    ### Why are the changes needed?
    
    Filter expressions can be arbitrarily large and should not be logged 
completely in these cases
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, logging is not user facing
    
    ### How was this patch tested?
    
    Added new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #49650 from olaky/spark-50558-add-simple-string-for-expression-set.
    
    Authored-by: Ole Sasse <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
    (cherry picked from commit 771d81ab48a6c990653028a3167a7ad0911ba573)
    Signed-off-by: Max Gekk <[email protected]>
---
 .../org/apache/spark/sql/catalyst/util/StringUtils.scala  | 15 ++++++++++++---
 .../spark/sql/catalyst/expressions/ExpressionSet.scala    |  9 +++++++++
 .../sql/catalyst/expressions/ExpressionSetSuite.scala     | 14 ++++++++++++++
 .../sql/execution/datasources/DataSourceStrategy.scala    |  5 ++++-
 .../sql/execution/datasources/FileSourceStrategy.scala    |  6 +++++-
 5 files changed, 44 insertions(+), 5 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index e8c50be9f551..486093225f06 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -83,7 +83,8 @@ object SparkStringUtils extends Logging {
       start: String,
       sep: String,
       end: String,
-      maxFields: Int): String = {
+      maxFields: Int,
+      customToString: Option[T => String] = None): String = {
     if (seq.length > maxFields) {
       if (truncationWarningPrinted.compareAndSet(false, true)) {
         logWarning(
@@ -94,9 +95,17 @@ object SparkStringUtils extends Logging {
       val restNum = seq.length - numFields
       val ending = (if (numFields == 0) "" else sep) +
         (if (restNum == 0) "" else s"... $restNum more fields") + end
-      seq.take(numFields).mkString(start, sep, ending)
+      if (customToString.isDefined) {
+        seq.take(numFields).map(customToString.get).mkString(start, sep, 
ending)
+      } else {
+        seq.take(numFields).mkString(start, sep, ending)
+      }
     } else {
-      seq.mkString(start, sep, end)
+      if (customToString.isDefined) {
+        seq.map(customToString.get).mkString(start, sep, end)
+      } else {
+        seq.mkString(start, sep, end)
+      }
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
index 1aa9f006463c..cc6fea2f1b7f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.sql.catalyst.util.SparkStringUtils
+
 object ExpressionSet {
   /**
    * Constructs a new [[ExpressionSet]] by applying 
[[Expression#canonicalized]] to `expressions`.
@@ -178,5 +180,12 @@ class ExpressionSet protected(
        |baseSet: ${baseSet.mkString(", ")}
        |originals: ${originals.mkString(", ")}
      """.stripMargin
+
+  /** Returns a length limited string that must be used for logging only. */
+  def simpleString(maxFields: Int): String = {
+    val customToString = { e: Expression => e.simpleString(maxFields) }
+    SparkStringUtils.truncatedString(
+      seq = originals.toSeq, start = "Set(", sep = ", ", end = ")", maxFields, 
Some(customToString))
+  }
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
index 4d5938b2e760..810c3f9ccf2e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala
@@ -240,4 +240,18 @@ class ExpressionSetSuite extends SparkFunSuite {
     assert((initialSet -- setToRemoveWithOutSameExpression).size == 2)
   }
 
+  test("simpleString limits the number of expressions recursively") {
+    val expressionSet =
+      ExpressionSet(InSet(aUpper, Set(0, 1)) :: Rand(1) :: Rand(2) :: Rand(3) 
:: Nil)
+    assert(expressionSet.simpleString(1) ==
+      "Set(A#1 INSET 0, ... 1 more fields, ... 3 more fields)")
+    assert(expressionSet.simpleString(2) == "Set(A#1 INSET 0, 1, rand(1), ... 
2 more fields)")
+    assert(expressionSet.simpleString(3) ==
+      "Set(A#1 INSET 0, 1, rand(1), rand(2), ... 1 more fields)")
+    assert(expressionSet.simpleString(4) == expressionSet.toString)
+
+    // Only one expression, but the simple string for this expression must be 
truncated.
+    val expressionSetTwo = ExpressionSet(InSet(aUpper, Set(0, 1, 2, 3, 4)) :: 
Nil)
+    assert(expressionSetTwo.simpleString(1) == "Set(A#1 INSET 0, ... 4 more 
fields)")
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e2e72a9e3695..a1bcf575ce58 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -52,6 +52,7 @@ import org.apache.spark.sql.execution.{RowDataSourceScanExec, 
SparkPlan}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
PushedDownOperators}
 import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -519,9 +520,11 @@ object DataSourceStrategy
       ExpressionSet(Nil)
     } else {
       val partitionSet = AttributeSet(partitionColumns)
+      val maxToStringFields = SQLConf.get.getConf(SQLConf.MAX_TO_STRING_FIELDS)
       val predicates = ExpressionSet(normalizedFilters
         .flatMap(extractPredicatesWithinOutputSet(_, partitionSet)))
-      logInfo(log"Pruning directories with: ${MDC(PREDICATES, 
predicates.mkString(","))}")
+      logInfo(log"Pruning directories with: ${MDC(PREDICATES,
+        predicates.simpleString(maxToStringFields))}")
       predicates
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 02235ffb1976..0f22c23791a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.trees.TreePattern.{PLAN_EXPRESSION, 
SCALAR_SUBQUERY}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.collection.BitSet
@@ -199,6 +200,7 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
           Some(f)
         }
       }
+
       val supportNestedPredicatePushdown =
         DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
       val pushedFilters = dataFilters
@@ -207,7 +209,9 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
 
       // Predicates with both partition keys and attributes need to be 
evaluated after the scan.
       val afterScanFilters = filterSet -- 
partitionKeyFilters.filter(_.references.nonEmpty)
-      logInfo(log"Post-Scan Filters: ${MDC(POST_SCAN_FILTERS, 
afterScanFilters.mkString(","))}")
+      val maxToStringFields = 
fsRelation.sparkSession.conf.get(SQLConf.MAX_TO_STRING_FIELDS)
+      logInfo(log"Post-Scan Filters: ${MDC(POST_SCAN_FILTERS,
+        afterScanFilters.simpleString(maxToStringFields))}")
 
       val filterAttributes = AttributeSet(afterScanFilters ++ stayUpFilters)
       val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq 
++ projects


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to