yihua commented on code in PR #18126:
URL: https://github.com/apache/hudi/pull/18126#discussion_r3066989673
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -269,10 +297,46 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// the whole table
if (haveProperPartitionValues(partitionPaths.toSeq) &&
partitionSchema.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
+ val partitionFieldNames = partitionSchema.fieldNames
+ def getPartitionColumnPath(expr: Expression): Option[String] = expr
match {
+ case a: AttributeReference =>
+ Some(a.name.replaceAll("#\\d+$", ""))
+ case GetStructField(child, _, Some(fieldName)) =>
+ getPartitionColumnPath(child).map(_ + "." + fieldName)
+ case _ => None
+ }
val transformedPredicate = predicate.transform {
Review Comment:
🤖 This uses `indexOf` for an exact (case-sensitive) string match, but the
`AttributeReference` case below uses `resolve(logicalName, sf.name)` which
respects Spark's case-sensitivity setting. When `spark.sql.caseSensitive=false`
(the default), a query like `Nested_Record.Level = 'INFO'` against partition
column `nested_record.level` would fail to bind here. Could you use a
resolver-based lookup, e.g. `partitionFieldNames.indexWhere(name =>
resolve(path, name))`?
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -269,10 +297,46 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// the whole table
if (haveProperPartitionValues(partitionPaths.toSeq) &&
partitionSchema.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
Review Comment:
🤖 I believe `AttributeReference.name` never contains the `#exprId` suffix —
that's only in `toString()`. If so, the `replaceAll("#\\d+$", "")` calls
throughout this PR (here, in `HoodieFileIndex.listFiles`, and in the Prune
rules) are no-ops. Is there a scenario where the name actually contains this
suffix, or could these be removed to avoid confusion?
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -217,20 +254,18 @@ case class HoodieFileIndex(spark: SparkSession,
*
Review Comment:
🤖 Removing the `isPartitionPruned` parameter means the Prune rule's call to
`filterFileSlices(dataFilters, partitionPruningFilters)` now falls through to
the index-lookup branch (col-stats, record-level index) when `dataFilters` is
non-empty, only to discard the result. Previously `isPartitionPruned=true`
short-circuited this. Could you add a comment explaining why the extra work is
acceptable, or consider an alternative to avoid redundant index lookups?
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -269,10 +297,46 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// the whole table
if (haveProperPartitionValues(partitionPaths.toSeq) &&
partitionSchema.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
+ val partitionFieldNames = partitionSchema.fieldNames
+ def getPartitionColumnPath(expr: Expression): Option[String] = expr
match {
+ case a: AttributeReference =>
+ Some(a.name.replaceAll("#\\d+$", ""))
+ case GetStructField(child, _, Some(fieldName)) =>
+ getPartitionColumnPath(child).map(_ + "." + fieldName)
+ case _ => None
+ }
val transformedPredicate = predicate.transform {
+ case g @ GetStructField(_, _, Some(_)) =>
+ getPartitionColumnPath(g).flatMap { path =>
+ val idx = partitionFieldNames.indexOf(path)
+ if (idx >= 0) Some(BoundReference(idx,
partitionSchema(idx).dataType, nullable = true))
+ else None
+ }.getOrElse(g)
+ // NOTE: Spark's optimizer auto-adds IsNotNull(struct_attr) when a
nested field is
+ // filtered (e.g. IsNotNull(nested_record) for filter
nested_record.level = 'INFO').
+ // The struct attribute cannot be directly bound to a flat partition
value.
+ // Since partition rows are parsed from partition paths they are
never null, so
+ // IsNotNull for any expression whose references are all
struct-parents of nested
+ // partition columns is always true (and IsNull always false).
+ // We match on the expression's references rather than requiring the
child to be a
+ // plain AttributeReference, because the optimizer may wrap it in
casts or other nodes.
Review Comment:
🤖 When `IsNotNull(nested_record)` is replaced with `Literal(true)`, this
assumes the `IsNotNull` only exists because Spark auto-added it for the nested
partition column filter. But if the struct `nested_record` also has
non-partition fields and the user explicitly wrote a `WHERE nested_record IS
NOT NULL` predicate for data correctness, this replacement silently drops it
from partition evaluation. Since this is in the partition pruning path (not the
final data filter), it won't cause incorrect results — but I want to confirm:
is the same `IsNotNull` filter also retained in the data filters so it's still
applied at scan time?
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]