This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new fb6e525 [fix](connector) Fix the issue when Spark pushes down in 'case when' (#300) fb6e525 is described below commit fb6e525fdc34d6fb8c1552ecdf8b41e4a0863685 Author: aoyuEra <huany...@yeah.net> AuthorDate: Mon Mar 31 22:25:38 2025 +0800 [fix](connector) Fix the issue when Spark pushes down in 'case when' (#300) --- .../apache/doris/spark/sql/DorisReaderITCase.scala | 39 ++++++++++++++++++++++ .../read/expression/V2ExpressionBuilder.scala | 15 +++++++++ .../read/expression/V2ExpressionBuilder.scala | 15 +++++++++ .../read/expression/V2ExpressionBuilder.scala | 15 +++++++++ 4 files changed, 84 insertions(+) diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala index 07c998b..b5d311f 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala @@ -427,4 +427,43 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo assert("List([3])".equals(likeFilter.toList.toString())) session.stop() } + + + @Test + def buildCaseWhenTest(): Unit = { + val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql") + ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) + + val session = SparkSession.builder().master("local[*]").getOrCreate() + + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + + val resultData = session.sql( + """ + |select * from ( + | select + | id, + | (case when c5 > 10 then c2 else null end) as cc1, + | (case when c4 < 5 then c3 else null end) as cc2 + | from test_source where c2 is not null + |) where !(cc1 is null and cc2 is null) order by id + |""".stripMargin) + + assert("List([1,127,null], [2,null,-32768], [3,null,0])".equals(resultData.collect().toList.toString())) + + session.stop() + + } } diff --git a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala index f7e08b9..cba20f1 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala @@ -57,6 +57,21 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) { case "<=" => s"`${build(e.children()(0))}` <= ${build(e.children()(1))}" case ">" => s"`${build(e.children()(0))}` > ${build(e.children()(1))}" case ">=" => s"`${build(e.children()(0))}` >= ${build(e.children()(1))}" + case "CASE_WHEN" => + val fragment = new StringBuilder("CASE ") + val expressions = e.children() + + for(i<- 0 until expressions.size - 1 by 2){ + fragment.append(s" WHEN ${build(expressions(i))} THEN ${build(expressions(i+1))} ") + } + + if (expressions.length % 2 != 0) { + val last = expressions(expressions.length - 1) + fragment.append(s" ELSE ${build(last)} ") + } + fragment.append(" END") + + fragment.mkString case _ => null } } diff --git a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala index f13830c..1913a51 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala @@ -55,6 +55,21 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) { case "<=" => s"`${build(e.children()(0))}` <= ${build(e.children()(1))}" case ">" => s"`${build(e.children()(0))}` > ${build(e.children()(1))}" case ">=" => s"`${build(e.children()(0))}` >= ${build(e.children()(1))}" + case "CASE_WHEN" => + val fragment = new StringBuilder("CASE ") + val expressions = e.children() + + for(i<- 0 until expressions.size - 1 by 2){ + fragment.append(s" WHEN ${build(expressions(i))} THEN ${build(expressions(i+1))} ") + } + + if (expressions.length % 2 != 0) { + val last = expressions(expressions.length - 1) + fragment.append(s" ELSE ${build(last)} ") + } + fragment.append(" END") + + fragment.mkString case _ => null } } diff --git a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala index f13830c..1913a51 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala @@ -55,6 +55,21 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) { case "<=" => s"`${build(e.children()(0))}` <= ${build(e.children()(1))}" case ">" => s"`${build(e.children()(0))}` > ${build(e.children()(1))}" case ">=" => s"`${build(e.children()(0))}` >= ${build(e.children()(1))}" + case "CASE_WHEN" => + val fragment = new StringBuilder("CASE ") + val expressions = e.children() + + for(i<- 0 until expressions.size - 1 by 2){ + fragment.append(s" WHEN ${build(expressions(i))} THEN ${build(expressions(i+1))} ") + } + + if (expressions.length % 2 != 0) { + val last = expressions(expressions.length - 1) + fragment.append(s" ELSE ${build(last)} ") + } + fragment.append(" END") + + fragment.mkString case _ => null } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org