This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new fb6e525  [fix](connector) Fix the issue when Spark pushes down in 
'case when' (#300)
fb6e525 is described below

commit fb6e525fdc34d6fb8c1552ecdf8b41e4a0863685
Author: aoyuEra <huany...@yeah.net>
AuthorDate: Mon Mar 31 22:25:38 2025 +0800

    [fix](connector) Fix the issue when Spark pushes down in 'case when' (#300)
---
 .../apache/doris/spark/sql/DorisReaderITCase.scala | 39 ++++++++++++++++++++++
 .../read/expression/V2ExpressionBuilder.scala      | 15 +++++++++
 .../read/expression/V2ExpressionBuilder.scala      | 15 +++++++++
 .../read/expression/V2ExpressionBuilder.scala      | 15 +++++++++
 4 files changed, 84 insertions(+)

diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 07c998b..b5d311f 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -427,4 +427,43 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
     assert("List([3])".equals(likeFilter.toList.toString()))
     session.stop()
   }
+
+
+  @Test
+  def buildCaseWhenTest(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_source
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.read.mode"="${readMode}",
+         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+         |)
+         |""".stripMargin)
+
+    val resultData = session.sql(
+      """
+        |select * from (
+        |   select
+        |    id,
+        |    (case when c5 > 10 then c2 else null end) as cc1,
+        |    (case when c4 < 5 then c3 else null end) as cc2
+        |   from test_source where c2 is not null
+        |) where !(cc1 is null and cc2 is null) order by id
+        |""".stripMargin)
+
+    assert("List([1,127,null], [2,null,-32768], 
[3,null,0])".equals(resultData.collect().toList.toString()))
+
+    session.stop()
+
+  }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index f7e08b9..cba20f1 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -57,6 +57,21 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
             case "<=" => s"`${build(e.children()(0))}` <= 
${build(e.children()(1))}"
             case ">" => s"`${build(e.children()(0))}` > 
${build(e.children()(1))}"
             case ">=" => s"`${build(e.children()(0))}` >= 
${build(e.children()(1))}"
+            case "CASE_WHEN" =>
+              val fragment = new StringBuilder("CASE ")
+              val expressions = e.children()
+
+              for(i<- 0 until expressions.size - 1 by 2){
+                fragment.append(s" WHEN ${build(expressions(i))} THEN 
${build(expressions(i+1))} ")
+              }
+
+              if (expressions.length % 2 != 0) {
+                val last = expressions(expressions.length - 1)
+                fragment.append(s" ELSE ${build(last)} ")
+              }
+              fragment.append(" END")
+
+              fragment.mkString
             case _ => null
           }
         }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index f13830c..1913a51 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -55,6 +55,21 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
             case "<=" => s"`${build(e.children()(0))}` <= 
${build(e.children()(1))}"
             case ">" => s"`${build(e.children()(0))}` > 
${build(e.children()(1))}"
             case ">=" => s"`${build(e.children()(0))}` >= 
${build(e.children()(1))}"
+            case "CASE_WHEN" =>
+              val fragment = new StringBuilder("CASE ")
+              val expressions = e.children()
+
+              for(i<- 0 until expressions.size - 1 by 2){
+                fragment.append(s" WHEN ${build(expressions(i))} THEN 
${build(expressions(i+1))} ")
+              }
+
+              if (expressions.length % 2 != 0) {
+                val last = expressions(expressions.length - 1)
+                fragment.append(s" ELSE ${build(last)} ")
+              }
+              fragment.append(" END")
+
+              fragment.mkString
             case _ => null
           }
         }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index f13830c..1913a51 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -55,6 +55,21 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
             case "<=" => s"`${build(e.children()(0))}` <= 
${build(e.children()(1))}"
             case ">" => s"`${build(e.children()(0))}` > 
${build(e.children()(1))}"
             case ">=" => s"`${build(e.children()(0))}` >= 
${build(e.children()(1))}"
+            case "CASE_WHEN" =>
+              val fragment = new StringBuilder("CASE ")
+              val expressions = e.children()
+
+              for(i<- 0 until expressions.size - 1 by 2){
+                fragment.append(s" WHEN ${build(expressions(i))} THEN 
${build(expressions(i+1))} ")
+              }
+
+              if (expressions.length % 2 != 0) {
+                val last = expressions(expressions.length - 1)
+                fragment.append(s" ELSE ${build(last)} ")
+              }
+              fragment.append(" END")
+
+              fragment.mkString
             case _ => null
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to