This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9996c67b92 [SPARK-41990][SQL] Use `FieldReference.column` instead of 
`apply` in V1 to V2 filter conversion
d9996c67b92 is described below

commit d9996c67b92d7a6d591fcbe8a42d87e3d9b8fd79
Author: huaxingao <[email protected]>
AuthorDate: Sat Jan 14 22:52:49 2023 -0800

    [SPARK-41990][SQL] Use `FieldReference.column` instead of `apply` in V1 to 
V2 filter conversion
    
    ### What changes were proposed in this pull request?
    
    Use `FieldReference.column` instead of `FieldReference.apply` in V1 to V2 
filter conversion
    
    ### Why are the changes needed?
    
    Previously, filtering by composite field name doesn't work
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #39564 from huaxingao/field_reference.
    
    Authored-by: huaxingao <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/sql/sources/filters.scala     | 36 ++++++++++++++--------
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 19 ++++++++++++
 2 files changed, 42 insertions(+), 13 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 66ec4a6c7b9..080d17b47fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.sources
 
 import org.apache.spark.annotation.{Evolving, Stable}
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.parser.ParseException
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
-import org.apache.spark.sql.connector.expressions.{FieldReference, 
LiteralValue}
+import org.apache.spark.sql.connector.expressions.{FieldReference, 
LiteralValue, NamedReference}
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => 
V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, Not => V2Not, Or => 
V2Or, Predicate}
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.unsafe.types.UTF8String
@@ -74,6 +75,15 @@ sealed abstract class Filter {
    * Converts V1 filter to V2 filter
    */
   private[sql] def toV2: Predicate
+
+  protected def toV2Column(attribute: String): NamedReference = {
+    try {
+      FieldReference(attribute)
+    } catch {
+      case _: ParseException =>
+        FieldReference.column(attribute)
+    }
+  }
 }
 
 /**
@@ -91,7 +101,7 @@ case class EqualTo(attribute: String, value: Any) extends 
Filter {
   override def toV2: Predicate = {
     val literal = Literal(value)
     new Predicate("=",
-      Array(FieldReference(attribute), LiteralValue(literal.value, 
literal.dataType)))
+      Array(toV2Column(attribute), LiteralValue(literal.value, 
literal.dataType)))
   }
 }
 
@@ -111,7 +121,7 @@ case class EqualNullSafe(attribute: String, value: Any) 
extends Filter {
   override def toV2: Predicate = {
     val literal = Literal(value)
     new Predicate("<=>",
-      Array(FieldReference(attribute), LiteralValue(literal.value, 
literal.dataType)))
+      Array(toV2Column(attribute), LiteralValue(literal.value, 
literal.dataType)))
   }
 }
 
@@ -130,7 +140,7 @@ case class GreaterThan(attribute: String, value: Any) 
extends Filter {
   override def toV2: Predicate = {
     val literal = Literal(value)
     new Predicate(">",
-      Array(FieldReference(attribute), LiteralValue(literal.value, 
literal.dataType)))
+      Array(toV2Column(attribute), LiteralValue(literal.value, 
literal.dataType)))
   }
 }
 
@@ -149,7 +159,7 @@ case class GreaterThanOrEqual(attribute: String, value: 
Any) extends Filter {
   override def toV2: Predicate = {
     val literal = Literal(value)
     new Predicate(">=",
-      Array(FieldReference(attribute), LiteralValue(literal.value, 
literal.dataType)))
+      Array(toV2Column(attribute), LiteralValue(literal.value, 
literal.dataType)))
   }
 }
 
@@ -168,7 +178,7 @@ case class LessThan(attribute: String, value: Any) extends 
Filter {
   override def toV2: Predicate = {
     val literal = Literal(value)
     new Predicate("<",
-      Array(FieldReference(attribute), LiteralValue(literal.value, 
literal.dataType)))
+      Array(toV2Column(attribute), LiteralValue(literal.value, 
literal.dataType)))
   }
 }
 
@@ -187,7 +197,7 @@ case class LessThanOrEqual(attribute: String, value: Any) 
extends Filter {
   override def toV2: Predicate = {
     val literal = Literal(value)
     new Predicate("<=",
-      Array(FieldReference(attribute), LiteralValue(literal.value, 
literal.dataType)))
+      Array(toV2Column(attribute), LiteralValue(literal.value, 
literal.dataType)))
   }
 }
 
@@ -230,7 +240,7 @@ case class In(attribute: String, values: Array[Any]) 
extends Filter {
       val literal = Literal(value)
       LiteralValue(literal.value, literal.dataType)
     }
-    new Predicate("IN", FieldReference(attribute) +: literals)
+    new Predicate("IN", toV2Column(attribute) +: literals)
   }
 }
 
@@ -245,7 +255,7 @@ case class In(attribute: String, values: Array[Any]) 
extends Filter {
 @Stable
 case class IsNull(attribute: String) extends Filter {
   override def references: Array[String] = Array(attribute)
-  override def toV2: Predicate = new Predicate("IS_NULL", 
Array(FieldReference(attribute)))
+  override def toV2: Predicate = new Predicate("IS_NULL", 
Array(toV2Column(attribute)))
 }
 
 /**
@@ -259,7 +269,7 @@ case class IsNull(attribute: String) extends Filter {
 @Stable
 case class IsNotNull(attribute: String) extends Filter {
   override def references: Array[String] = Array(attribute)
-  override def toV2: Predicate = new Predicate("IS_NOT_NULL", 
Array(FieldReference(attribute)))
+  override def toV2: Predicate = new Predicate("IS_NOT_NULL", 
Array(toV2Column(attribute)))
 }
 
 /**
@@ -308,7 +318,7 @@ case class Not(child: Filter) extends Filter {
 case class StringStartsWith(attribute: String, value: String) extends Filter {
   override def references: Array[String] = Array(attribute)
   override def toV2: Predicate = new Predicate("STARTS_WITH",
-    Array(FieldReference(attribute), 
LiteralValue(UTF8String.fromString(value), StringType)))
+    Array(toV2Column(attribute), LiteralValue(UTF8String.fromString(value), 
StringType)))
 }
 
 /**
@@ -324,7 +334,7 @@ case class StringStartsWith(attribute: String, value: 
String) extends Filter {
 case class StringEndsWith(attribute: String, value: String) extends Filter {
   override def references: Array[String] = Array(attribute)
   override def toV2: Predicate = new Predicate("ENDS_WITH",
-    Array(FieldReference(attribute), 
LiteralValue(UTF8String.fromString(value), StringType)))
+    Array(toV2Column(attribute), LiteralValue(UTF8String.fromString(value), 
StringType)))
 }
 
 /**
@@ -340,7 +350,7 @@ case class StringEndsWith(attribute: String, value: String) 
extends Filter {
 case class StringContains(attribute: String, value: String) extends Filter {
   override def references: Array[String] = Array(attribute)
   override def toV2: Predicate = new Predicate("CONTAINS",
-    Array(FieldReference(attribute), 
LiteralValue(UTF8String.fromString(value), StringType)))
+    Array(toV2Column(attribute), LiteralValue(UTF8String.fromString(value), 
StringType)))
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b87fee6cec2..3e317dc9547 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -276,6 +276,20 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
       "INSERT INTO test.datetime VALUES ('2018-07-12', '2018-07-12 
09:51:15.0')").executeUpdate()
     conn.commit()
 
+    conn.prepareStatement(
+      "CREATE TABLE test.composite_name (`last name` TEXT(32) NOT NULL, id 
INTEGER NOT NULL)")
+      .executeUpdate()
+    conn.prepareStatement("INSERT INTO test.composite_name VALUES ('smith', 
1)").executeUpdate()
+    conn.prepareStatement("INSERT INTO test.composite_name VALUES ('jones', 
2)").executeUpdate()
+    conn.commit()
+
+    sql(
+      s"""
+        |CREATE OR REPLACE TEMPORARY VIEW composite_name
+        |USING org.apache.spark.sql.jdbc
+        |OPTIONS (url '$url', dbtable 'TEST.COMPOSITE_NAME', user 'testUser', 
password 'testPass')
+       """.stripMargin.replaceAll("\n", " "))
+
     // Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types.
   }
 
@@ -1963,4 +1977,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
       }
     }
   }
+
+  test("SPARK-41990: Filter with composite name") {
+    val df = sql("SELECT * FROM composite_name WHERE `last name` = 'smith'")
+    assert(df.collect.toSet === Set(Row("smith", 1)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to