This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new bba3a62  [Improve](connector) fix V2Expression build error (#339)
bba3a62 is described below

commit bba3a62e5f9c6e86f30f2f4ca4e9c1a572eb2423
Author: wanggx <[email protected]>
AuthorDate: Thu Oct 9 17:41:50 2025 +0800

    [Improve](connector) fix V2Expression build error (#339)
    
    Fix V2Expression build error  with detailMessage = Unknown column 'null' in 
'table list'
    ---------
    
    Co-authored-by: wangguoxing <[email protected]>
---
 .../apache/doris/spark/sql/DorisReaderITCase.scala | 73 ++++++++++++++++++++++
 .../container/ddl/read_filter_pushdown.sql         | 29 +++++++++
 .../apache/doris/spark/read/DorisScanBuilder.scala |  2 +-
 .../org/apache/doris/spark/read/DorisScanV2.scala  |  2 +-
 .../read/expression/V2ExpressionBuilder.scala      | 46 +++++++++-----
 .../read/expression/V2ExpressionBuilderTest.scala  | 19 +++++-
 .../org/apache/spark/sql/ExpressionUtil.scala}     | 19 +++---
 .../apache/doris/spark/read/DorisScanBuilder.scala |  2 +-
 .../org/apache/doris/spark/read/DorisScanV2.scala  |  2 +-
 .../read/expression/V2ExpressionBuilder.scala      | 46 +++++++++-----
 .../read/expression/V2ExpressionBuilderTest.scala  | 18 +++++-
 .../org/apache/spark/sql/ExpressionUtil.scala}     | 20 +++---
 .../apache/doris/spark/read/DorisScanBuilder.scala |  2 +-
 .../org/apache/doris/spark/read/DorisScanV2.scala  |  2 +-
 .../read/expression/V2ExpressionBuilder.scala      | 46 +++++++++-----
 .../read/expression/V2ExpressionBuilderTest.scala  | 18 +++++-
 .../org/apache/spark/sql/ExpressionUtil.scala}     | 20 +++---
 17 files changed, 274 insertions(+), 92 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 5fa9112..2ed6188 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -56,6 +56,7 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) 
extends AbstractCo
   val TABLE_READ_UTF8_TBL = "tbl_read_utf8_tbl"
   val TABLE_READ_TBL_ALL_TYPES = "tbl_read_tbl_all_types"
   val TABLE_READ_TBL_BIT_MAP = "tbl_read_tbl_bitmap"
+  val TABLE_READ_EXPRESSION_NOTPUSHDOWN = "tbl_expression_notpushdown"
 
   @Before
   def setUp(): Unit = {
@@ -560,4 +561,76 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
       assert("List([1], [2])".equals(prefixTest.toList.toString()))
     }
   }
+
+  @Test
+  def testExpressionNotPushDown(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_filter_pushdown.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + 
TABLE_READ_EXPRESSION_NOTPUSHDOWN}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}"
+           |)
+           |""".stripMargin)
+
+      val resultData = session.sql(
+        """
+          |select COALESCE(CAST(A4 AS STRING),'null')
+          |from test_source where COALESCE(CAST(A4 AS STRING),'null') in ('a4')
+          |""".stripMargin)
+
+      println(resultData.collect().toList.toString())
+      assert("List([a4], [a4], [a4], [a4], 
[a4])".equals(resultData.collect().toList.toString()))
+
+      val resultData1 = session.sql(
+        """
+          |select COALESCE(CAST(NAME AS STRING),'null')
+          |from test_source where COALESCE(CAST(NAME AS STRING),'null') in 
('name2')
+          |""".stripMargin)
+
+      assert("List([name2])".equals(resultData1.collect().toList.toString()))
+    } finally {
+      session.stop()
+    }
+  }
+
+  @Test
+  def testDataFrameExpressionNotPushDown(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_filter_pushdown.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val df = session.read
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + 
TABLE_READ_EXPRESSION_NOTPUSHDOWN)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .load()
+
+      import org.apache.spark.sql.functions._
+
+      val resultData = df.select(coalesce(col("A4").cast("string"), 
lit("null")).as("coalesced_A4"))
+        .filter(col("coalesced_A4").isin("a4"))
+
+      println(resultData.collect().toList.toString())
+      assert("List([a4], [a4], [a4], [a4], 
[a4])".equals(resultData.collect().toList.toString()))
+
+      val resultData1 = df.select(coalesce(col("NAME").cast("string"), 
lit("null")).as("coalesced_NAME"))
+        .filter(col("coalesced_NAME").isin("name2"))
+
+      assert("List([name2])".equals(resultData1.collect().toList.toString()))
+    } finally {
+      session.stop()
+    }
+  }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_filter_pushdown.sql
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_filter_pushdown.sql
new file mode 100644
index 0000000..a2176de
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_filter_pushdown.sql
@@ -0,0 +1,29 @@
+DROP TABLE IF EXISTS tbl_expression_notpushdown;
+CREATE TABLE tbl_expression_notpushdown (
+                                      ID decimal(38,10) NULL,
+                                      NAME varchar(300) NULL,
+                                      AGE decimal(38,10) NULL,
+                                      CREATE_TIME datetime(3) NULL,
+                                      A1 varchar(300) NULL,
+                                      A2 varchar(300) NULL,
+                                      A3 varchar(300) NULL,
+                                      A4 varchar(300) NULL,
+                                      __source_ts_ms bigint NULL,
+                                      __op varchar(10) NULL,
+                                      __table varchar(100) NULL,
+                                      __db varchar(50) NULL,
+                                      __deleted varchar(10) NULL,
+                                      __dt datetime NULL
+) ENGINE=OLAP
+DUPLICATE KEY(ID)
+DISTRIBUTED BY HASH(`ID`) BUCKETS 2
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+
+insert into tbl_expression_notpushdown values(1, 'name1', 18, '2021-01-01 
00:00:00.000', 'a1', 'a2', 'a3', 'a4', 1609459200000, 'c', 
'tbl_read_tbl_all_types', 'db_read_tbl_all_types', false, '2021-01-01 
00:00:00');
+insert into tbl_expression_notpushdown values(2, 'name2', 19, '2021-01-01 
00:00:00.000', 'a1', 'a2', 'a3', 'a4', 1609459200000, 'c', 
'tbl_read_tbl_all_types', 'db_read_tbl_all_types', false, '2021-01-01 
00:00:00');
+insert into tbl_expression_notpushdown values(3, 'name3', 20, '2021-01-01 
00:00:00.000', 'a1', 'a2', 'a3', 'a4', 1609459200000, 'c', 
'tbl_read_tbl_all_types', 'db_read_tbl_all_types', false, '2021-01-01 
00:00:00');
+insert into tbl_expression_notpushdown values(4, 'name4', 21, '2021-01-01 
00:00:00.000', 'a1', 'a2', 'a3', 'a4', 1609459200000, 'c', 
'tbl_read_tbl_all_types', 'db_read_tbl_all_types', false, '2021-01-01 
00:00:00');
+insert into tbl_expression_notpushdown values(5, 'name5', 22, '2021-01-01 
00:00:00.000', 'a1', 'a2', 'a3', 'a4', 1609459200000, 'c', 
'tbl_read_tbl_all_types', 'db_read_tbl_all_types', false, '2021-01-01 
00:00:00');
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
index 61a9e20..26b45f4 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
@@ -37,7 +37,7 @@ class DorisScanBuilder(config: DorisConfig, schema: 
StructType) extends DorisSca
 
   override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] 
= {
     val (pushed, unsupported) = predicates.partition(predicate => {
-      Option(expressionBuilder.build(predicate)).isDefined
+      expressionBuilder.buildOpt(predicate).isDefined
     })
     this.pushDownPredicates = pushed
     unsupported
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
index 503ad04..5e0104b 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
@@ -27,7 +27,7 @@ class DorisScanV2(config: DorisConfig, schema: StructType, 
filters: Array[Predic
   override protected def compiledFilters(): Array[String] = {
     val inValueLengthLimit = 
config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
     val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit)
-    filters.map(e => 
Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get)
+    filters.map(e => 
v2ExpressionBuilder.buildOpt(e)).filter(_.isDefined).map(_.get)
   }
 
   override protected def getLimit: Int = limit
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index cba20f1..880784c 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -17,12 +17,26 @@
 
 package org.apache.doris.spark.read.expression
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, 
AlwaysTrue, And, Not, Or}
 import org.apache.spark.sql.connector.expressions.{Expression, 
GeneralScalarExpression, Literal, NamedReference}
 import org.apache.spark.sql.types.{DateType, TimestampType}
 
-class V2ExpressionBuilder(inValueLengthLimit: Int) {
+import scala.util.{Failure, Success, Try}
+
+class V2ExpressionBuilder(inValueLengthLimit: Int) extends Logging {
+
+  def buildOpt(predicate: Expression): Option[String] = {
+    Try {
+      Some(build(predicate))
+    } match {
+      case Success(value) => value
+      case Failure(exception) =>
+        logWarning(s"Failed to build expression: ${predicate.toString}, and 
not support predicate push down, errMsg is ${exception.getMessage}")
+        None
+    }
+  }
 
   def build(predicate: Expression): String = {
     predicate match {
@@ -39,24 +53,24 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
       case expr: Expression =>
         expr match {
           case literal: Literal[_] => visitLiteral(literal)
-          case namedRef: NamedReference => namedRef.toString
+          case namedRef: NamedReference => s"`${namedRef.toString}`"
           case e: GeneralScalarExpression => e.name() match {
             case "IN" =>
               val expressions = e.children()
               if (expressions.nonEmpty && expressions.length <= 
inValueLengthLimit) {
-                s"""`${build(expressions(0))}` IN (${expressions.slice(1, 
expressions.length).map(build).mkString(",")})"""
-              } else null
-            case "IS_NULL" => s"`${build(e.children()(0))}` IS NULL"
-            case "IS_NOT_NULL" => s"`${build(e.children()(0))}` IS NOT NULL"
+                s"""${build(expressions(0))} IN (${expressions.slice(1, 
expressions.length).map(build).mkString(",")})"""
+              } else throw new IllegalArgumentException(s"exceeding limit of 
IN values: actual size ${expressions.length}, limit size $inValueLengthLimit")
+            case "IS_NULL" => s"${build(e.children()(0))} IS NULL"
+            case "IS_NOT_NULL" => s"${build(e.children()(0))} IS NOT NULL"
             case "STARTS_WITH" => visitStartWith(build(e.children()(0)), 
build(e.children()(1)));
             case "ENDS_WITH" => visitEndWith(build(e.children()(0)), 
build(e.children()(1)));
             case "CONTAINS" => visitContains(build(e.children()(0)), 
build(e.children()(1)));
-            case "=" => s"`${build(e.children()(0))}` = 
${build(e.children()(1))}"
-            case "!=" | "<>" => s"`${build(e.children()(0))}` != 
${build(e.children()(1))}"
-            case "<" => s"`${build(e.children()(0))}` < 
${build(e.children()(1))}"
-            case "<=" => s"`${build(e.children()(0))}` <= 
${build(e.children()(1))}"
-            case ">" => s"`${build(e.children()(0))}` > 
${build(e.children()(1))}"
-            case ">=" => s"`${build(e.children()(0))}` >= 
${build(e.children()(1))}"
+            case "=" => s"${build(e.children()(0))} = 
${build(e.children()(1))}"
+            case "!=" | "<>" => s"${build(e.children()(0))} != 
${build(e.children()(1))}"
+            case "<" => s"${build(e.children()(0))} < 
${build(e.children()(1))}"
+            case "<=" => s"${build(e.children()(0))} <= 
${build(e.children()(1))}"
+            case ">" => s"${build(e.children()(0))} > 
${build(e.children()(1))}"
+            case ">=" => s"${build(e.children()(0))} >= 
${build(e.children()(1))}"
             case "CASE_WHEN" =>
               val fragment = new StringBuilder("CASE ")
               val expressions = e.children()
@@ -72,7 +86,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
               fragment.append(" END")
 
               fragment.mkString
-            case _ => null
+            case _ => throw new IllegalArgumentException(s"Unsupported 
expression: ${e.name()}")
           }
         }
     }
@@ -91,17 +105,17 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
 
   def visitStartWith(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
-    s"`$l` LIKE '$value%'"
+    s"$l LIKE '$value%'"
   }
 
   def visitEndWith(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
-    s"`$l` LIKE '%$value'"
+    s"$l LIKE '%$value'"
   }
 
   def visitContains(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
-    s"`$l` LIKE '%$value%'"
+    s"$l LIKE '%$value%'"
   }
 
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
index fc29495..6e72689 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
@@ -17,6 +17,8 @@ package org.apache.doris.spark.read.expression
 // specific language governing permissions and limitations
 // under the License.
 
+
+import org.apache.spark.sql.ExpressionUtil
 import org.apache.spark.sql.sources._
 import org.junit.jupiter.api.{Assertions, Test}
 
@@ -42,8 +44,23 @@ class V2ExpressionBuilderTest {
     Assertions.assertEquals(builder.build(Or(EqualTo("c14", 17), 
EqualTo("c15", 18)).toV2), "(`c14` = 17 OR `c15` = 18)")
     Assertions.assertEquals(builder.build(AlwaysTrue.toV2), "1=1")
     Assertions.assertEquals(builder.build(AlwaysFalse.toV2), "1=0")
-    Assertions.assertNull(builder.build(In("c19", 
Array(19,20,21,22,23,24,25,26,27,28,29)).toV2))
+    Assertions.assertEquals(builder.build(In("c19", 
Array(19,20,21,22,23,24,25,26)).toV2), "`c19` IN (19,20,21,22,23,24,25,26)")
+    Assertions.assertEquals(builder.build(In("c19", Array("19","20")).toV2), 
"`c19` IN ('19','20')")
+    val inException = 
Assertions.assertThrows(classOf[IllegalArgumentException], () => 
builder.build(In("c19", Array(19,20,21,22,23,24,25,26,27,28,29)).toV2))
+    Assertions.assertEquals(inException.getMessage, "exceeding limit of IN 
values: actual size 12, limit size 10")
+    val exception = Assertions.assertThrows(classOf[IllegalArgumentException], 
() => builder.build(ExpressionUtil.buildCoalesceFilter()))
+    Assertions.assertEquals(exception.getMessage, "Unsupported expression: 
COALESCE")
+
+  }
 
+  @Test
+  def buildOptTest() : Unit = {
+
+    val builder = new V2ExpressionBuilder(10)
+    Assertions.assertEquals(builder.buildOpt(EqualTo("c0", 1).toV2), 
Some("`c0` = 1"))
+    Assertions.assertEquals(builder.buildOpt(Not(EqualTo("c1", 2)).toV2), 
Some("`c1` != 2"))
+    Assertions.assertEquals(builder.buildOpt(GreaterThan("c2", 3.4).toV2), 
Some("`c2` > 3.4"))
+    
Assertions.assertEquals(builder.buildOpt(ExpressionUtil.buildCoalesceFilter()), 
None)
   }
 
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/spark/sql/ExpressionUtil.scala
similarity index 53%
copy from 
spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
copy to 
spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/spark/sql/ExpressionUtil.scala
index 503ad04..84b8beb 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/spark/sql/ExpressionUtil.scala
@@ -15,20 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.spark.read
+package org.apache.spark.sql
 
-import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
-import org.apache.doris.spark.read.expression.V2ExpressionBuilder
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
GeneralScalarExpression, LiteralValue}
+import org.apache.spark.sql.types.StringType
 
-class DorisScanV2(config: DorisConfig, schema: StructType, filters: 
Array[Predicate], limit: Int) extends AbstractDorisScan(config, schema) with 
Logging {
-  override protected def compiledFilters(): Array[String] = {
-    val inValueLengthLimit = 
config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
-    val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit)
-    filters.map(e => 
Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get)
+object ExpressionUtil {
+
+  def buildCoalesceFilter(): Expression = {
+    val gse = new GeneralScalarExpression("COALESCE", 
Array(FieldReference(Seq("A4")), LiteralValue("null", StringType)))
+    new Predicate("=", Array(gse, LiteralValue("1", StringType)))
   }
 
-  override protected def getLimit: Int = limit
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
index 61a9e20..26b45f4 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
@@ -37,7 +37,7 @@ class DorisScanBuilder(config: DorisConfig, schema: 
StructType) extends DorisSca
 
   override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] 
= {
     val (pushed, unsupported) = predicates.partition(predicate => {
-      Option(expressionBuilder.build(predicate)).isDefined
+      expressionBuilder.buildOpt(predicate).isDefined
     })
     this.pushDownPredicates = pushed
     unsupported
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
index 503ad04..5e0104b 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
@@ -27,7 +27,7 @@ class DorisScanV2(config: DorisConfig, schema: StructType, 
filters: Array[Predic
   override protected def compiledFilters(): Array[String] = {
     val inValueLengthLimit = 
config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
     val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit)
-    filters.map(e => 
Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get)
+    filters.map(e => 
v2ExpressionBuilder.buildOpt(e)).filter(_.isDefined).map(_.get)
   }
 
   override protected def getLimit: Int = limit
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index cba20f1..880784c 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -17,12 +17,26 @@
 
 package org.apache.doris.spark.read.expression
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, 
AlwaysTrue, And, Not, Or}
 import org.apache.spark.sql.connector.expressions.{Expression, 
GeneralScalarExpression, Literal, NamedReference}
 import org.apache.spark.sql.types.{DateType, TimestampType}
 
-class V2ExpressionBuilder(inValueLengthLimit: Int) {
+import scala.util.{Failure, Success, Try}
+
+class V2ExpressionBuilder(inValueLengthLimit: Int) extends Logging {
+
+  def buildOpt(predicate: Expression): Option[String] = {
+    Try {
+      Some(build(predicate))
+    } match {
+      case Success(value) => value
+      case Failure(exception) =>
+        logWarning(s"Failed to build expression: ${predicate.toString}, and 
not support predicate push down, errMsg is ${exception.getMessage}")
+        None
+    }
+  }
 
   def build(predicate: Expression): String = {
     predicate match {
@@ -39,24 +53,24 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
       case expr: Expression =>
         expr match {
           case literal: Literal[_] => visitLiteral(literal)
-          case namedRef: NamedReference => namedRef.toString
+          case namedRef: NamedReference => s"`${namedRef.toString}`"
           case e: GeneralScalarExpression => e.name() match {
             case "IN" =>
               val expressions = e.children()
               if (expressions.nonEmpty && expressions.length <= 
inValueLengthLimit) {
-                s"""`${build(expressions(0))}` IN (${expressions.slice(1, 
expressions.length).map(build).mkString(",")})"""
-              } else null
-            case "IS_NULL" => s"`${build(e.children()(0))}` IS NULL"
-            case "IS_NOT_NULL" => s"`${build(e.children()(0))}` IS NOT NULL"
+                s"""${build(expressions(0))} IN (${expressions.slice(1, 
expressions.length).map(build).mkString(",")})"""
+              } else throw new IllegalArgumentException(s"exceeding limit of 
IN values: actual size ${expressions.length}, limit size $inValueLengthLimit")
+            case "IS_NULL" => s"${build(e.children()(0))} IS NULL"
+            case "IS_NOT_NULL" => s"${build(e.children()(0))} IS NOT NULL"
             case "STARTS_WITH" => visitStartWith(build(e.children()(0)), 
build(e.children()(1)));
             case "ENDS_WITH" => visitEndWith(build(e.children()(0)), 
build(e.children()(1)));
             case "CONTAINS" => visitContains(build(e.children()(0)), 
build(e.children()(1)));
-            case "=" => s"`${build(e.children()(0))}` = 
${build(e.children()(1))}"
-            case "!=" | "<>" => s"`${build(e.children()(0))}` != 
${build(e.children()(1))}"
-            case "<" => s"`${build(e.children()(0))}` < 
${build(e.children()(1))}"
-            case "<=" => s"`${build(e.children()(0))}` <= 
${build(e.children()(1))}"
-            case ">" => s"`${build(e.children()(0))}` > 
${build(e.children()(1))}"
-            case ">=" => s"`${build(e.children()(0))}` >= 
${build(e.children()(1))}"
+            case "=" => s"${build(e.children()(0))} = 
${build(e.children()(1))}"
+            case "!=" | "<>" => s"${build(e.children()(0))} != 
${build(e.children()(1))}"
+            case "<" => s"${build(e.children()(0))} < 
${build(e.children()(1))}"
+            case "<=" => s"${build(e.children()(0))} <= 
${build(e.children()(1))}"
+            case ">" => s"${build(e.children()(0))} > 
${build(e.children()(1))}"
+            case ">=" => s"${build(e.children()(0))} >= 
${build(e.children()(1))}"
             case "CASE_WHEN" =>
               val fragment = new StringBuilder("CASE ")
               val expressions = e.children()
@@ -72,7 +86,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
               fragment.append(" END")
 
               fragment.mkString
-            case _ => null
+            case _ => throw new IllegalArgumentException(s"Unsupported 
expression: ${e.name()}")
           }
         }
     }
@@ -91,17 +105,17 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
 
   def visitStartWith(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
-    s"`$l` LIKE '$value%'"
+    s"$l LIKE '$value%'"
   }
 
   def visitEndWith(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
-    s"`$l` LIKE '%$value'"
+    s"$l LIKE '%$value'"
   }
 
   def visitContains(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
-    s"`$l` LIKE '%$value%'"
+    s"$l LIKE '%$value%'"
   }
 
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
index fc29495..7691b29 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
@@ -17,6 +17,7 @@ package org.apache.doris.spark.read.expression
 // specific language governing permissions and limitations
 // under the License.
 
+import org.apache.spark.sql.ExpressionUtil
 import org.apache.spark.sql.sources._
 import org.junit.jupiter.api.{Assertions, Test}
 
@@ -42,8 +43,23 @@ class V2ExpressionBuilderTest {
     Assertions.assertEquals(builder.build(Or(EqualTo("c14", 17), 
EqualTo("c15", 18)).toV2), "(`c14` = 17 OR `c15` = 18)")
     Assertions.assertEquals(builder.build(AlwaysTrue.toV2), "1=1")
     Assertions.assertEquals(builder.build(AlwaysFalse.toV2), "1=0")
-    Assertions.assertNull(builder.build(In("c19", 
Array(19,20,21,22,23,24,25,26,27,28,29)).toV2))
+    Assertions.assertEquals(builder.build(In("c19", 
Array(19,20,21,22,23,24,25,26)).toV2), "`c19` IN (19,20,21,22,23,24,25,26)")
+    Assertions.assertEquals(builder.build(In("c19", Array("19","20")).toV2), 
"`c19` IN ('19','20')")
+    val inException = 
Assertions.assertThrows(classOf[IllegalArgumentException], () => 
builder.build(In("c19", Array(19,20,21,22,23,24,25,26,27,28,29)).toV2))
+    Assertions.assertEquals(inException.getMessage, "exceeding limit of IN 
values: actual size 12, limit size 10")
+    val exception = Assertions.assertThrows(classOf[IllegalArgumentException], 
() => builder.build(ExpressionUtil.buildCoalesceFilter()))
+    Assertions.assertEquals(exception.getMessage, "Unsupported expression: 
COALESCE")
 
   }
 
+  @Test
+  def buildOptTest() : Unit = {
+
+    val builder = new V2ExpressionBuilder(10)
+    Assertions.assertEquals(builder.buildOpt(EqualTo("c0", 1).toV2), 
Some("`c0` = 1"))
+    Assertions.assertEquals(builder.buildOpt(Not(EqualTo("c1", 2)).toV2), 
Some("`c1` != 2"))
+    Assertions.assertEquals(builder.buildOpt(GreaterThan("c2", 3.4).toV2), 
Some("`c2` > 3.4"))
+    
Assertions.assertEquals(builder.buildOpt(ExpressionUtil.buildCoalesceFilter()), 
None)
+  }
+
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/spark/sql/ExpressionUtil.scala
similarity index 52%
copy from 
spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
copy to 
spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/spark/sql/ExpressionUtil.scala
index 503ad04..2da931a 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/spark/sql/ExpressionUtil.scala
@@ -15,20 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.spark.read
+package org.apache.spark.sql
 
-import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
-import org.apache.doris.spark.read.expression.V2ExpressionBuilder
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
GeneralScalarExpression, LiteralValue}
+import org.apache.spark.sql.types.StringType
 
-class DorisScanV2(config: DorisConfig, schema: StructType, filters: 
Array[Predicate], limit: Int) extends AbstractDorisScan(config, schema) with 
Logging {
-  override protected def compiledFilters(): Array[String] = {
-    val inValueLengthLimit = 
config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
-    val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit)
-    filters.map(e => 
Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get)
-  }
+object ExpressionUtil {
 
-  override protected def getLimit: Int = limit
+  def buildCoalesceFilter(): Expression = {
+    val gse = new GeneralScalarExpression("COALESCE", 
Array(FieldReference(Seq("A4")), LiteralValue("null", StringType)))
+    new Predicate("=", Array(gse, LiteralValue("1", StringType)))
+  }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
index 61a9e20..26b45f4 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
@@ -37,7 +37,7 @@ class DorisScanBuilder(config: DorisConfig, schema: 
StructType) extends DorisSca
 
   override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] 
= {
     val (pushed, unsupported) = predicates.partition(predicate => {
-      Option(expressionBuilder.build(predicate)).isDefined
+      expressionBuilder.buildOpt(predicate).isDefined
     })
     this.pushDownPredicates = pushed
     unsupported
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
index 503ad04..5e0104b 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
@@ -27,7 +27,7 @@ class DorisScanV2(config: DorisConfig, schema: StructType, 
filters: Array[Predic
   override protected def compiledFilters(): Array[String] = {
     val inValueLengthLimit = 
config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
     val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit)
-    filters.map(e => 
Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get)
+    filters.map(e => 
v2ExpressionBuilder.buildOpt(e)).filter(_.isDefined).map(_.get)
   }
 
   override protected def getLimit: Int = limit
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index 6e1c010..9a86a69 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -17,12 +17,26 @@
 
 package org.apache.doris.spark.read.expression
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, 
AlwaysTrue, And, Not, Or}
 import org.apache.spark.sql.connector.expressions.{Expression, 
GeneralScalarExpression, Literal, NamedReference}
 import org.apache.spark.sql.types.{DateType, TimestampType}
 
-class V2ExpressionBuilder(inValueLengthLimit: Int) {
+import scala.util.{Failure, Success, Try}
+
+class V2ExpressionBuilder(inValueLengthLimit: Int) extends Logging {
+
+  def buildOpt(predicate: Expression): Option[String] = {
+    Try {
+      Some(build(predicate))
+    } match {
+      case Success(value) => value
+      case Failure(exception) =>
+        logWarning(s"Failed to build expression: ${predicate.toString}, and 
not support predicate push down, errMsg is ${exception.getMessage}")
+        None
+    }
+  }
 
   def build(predicate: Expression): String = {
     predicate match {
@@ -39,24 +53,24 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
       case expr: Expression =>
         expr match {
           case literal: Literal[_] => visitLiteral(literal)
-          case namedRef: NamedReference => namedRef.toString
+          case namedRef: NamedReference => s"`${namedRef.toString}`"
           case e: GeneralScalarExpression => e.name() match {
             case "IN" =>
               val expressions = e.children()
               if (expressions.nonEmpty && expressions.length <= 
inValueLengthLimit) {
-                s"""`${build(expressions(0))}` IN (${expressions.slice(1, 
expressions.length).map(build).mkString(",")})"""
-              } else null
-            case "IS_NULL" => s"`${build(e.children()(0))}` IS NULL"
-            case "IS_NOT_NULL" => s"`${build(e.children()(0))}` IS NOT NULL"
+                s"""${build(expressions(0))} IN (${expressions.slice(1, 
expressions.length).map(build).mkString(",")})"""
+              } else throw new IllegalArgumentException(s"exceeding limit of 
IN values: actual size ${expressions.length}, limit size $inValueLengthLimit")
+            case "IS_NULL" => s"${build(e.children()(0))} IS NULL"
+            case "IS_NOT_NULL" => s"${build(e.children()(0))} IS NOT NULL"
             case "STARTS_WITH" => visitStartWith(build(e.children()(0)), 
build(e.children()(1)));
             case "ENDS_WITH" => visitEndWith(build(e.children()(0)), 
build(e.children()(1)));
             case "CONTAINS" => visitContains(build(e.children()(0)), 
build(e.children()(1)));
-            case "=" => s"`${build(e.children()(0))}` = 
${build(e.children()(1))}"
-            case "!=" | "<>" => s"`${build(e.children()(0))}` != 
${build(e.children()(1))}"
-            case "<" => s"`${build(e.children()(0))}` < 
${build(e.children()(1))}"
-            case "<=" => s"`${build(e.children()(0))}` <= 
${build(e.children()(1))}"
-            case ">" => s"`${build(e.children()(0))}` > 
${build(e.children()(1))}"
-            case ">=" => s"`${build(e.children()(0))}` >= 
${build(e.children()(1))}"
+            case "=" => s"${build(e.children()(0))} = 
${build(e.children()(1))}"
+            case "!=" | "<>" => s"${build(e.children()(0))} != 
${build(e.children()(1))}"
+            case "<" => s"${build(e.children()(0))} < 
${build(e.children()(1))}"
+            case "<=" => s"${build(e.children()(0))} <= 
${build(e.children()(1))}"
+            case ">" => s"${build(e.children()(0))} > 
${build(e.children()(1))}"
+            case ">=" => s"${build(e.children()(0))} >= 
${build(e.children()(1))}"
             case "CASE_WHEN" =>
               val fragment = new StringBuilder("CASE ")
               val expressions = e.children()
@@ -72,7 +86,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
               fragment.append(" END")
 
               fragment.mkString
-            case _ => null
+            case _ => throw new IllegalArgumentException(s"Unsupported 
expression: ${e.name()}")
           }
         }
     }
@@ -90,17 +104,17 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
   }
   def visitStartWith(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
-    s"`$l` LIKE '$value%'"
+    s"$l LIKE '$value%'"
   }
 
   def visitEndWith(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
-    s"`$l` LIKE '%$value'"
+    s"$l LIKE '%$value'"
   }
 
   def visitContains(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
-    s"`$l` LIKE '%$value%'"
+    s"$l LIKE '%$value%'"
   }
 
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
index fc29495..7691b29 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala
@@ -17,6 +17,7 @@ package org.apache.doris.spark.read.expression
 // specific language governing permissions and limitations
 // under the License.
 
+import org.apache.spark.sql.ExpressionUtil
 import org.apache.spark.sql.sources._
 import org.junit.jupiter.api.{Assertions, Test}
 
@@ -42,8 +43,23 @@ class V2ExpressionBuilderTest {
     Assertions.assertEquals(builder.build(Or(EqualTo("c14", 17), 
EqualTo("c15", 18)).toV2), "(`c14` = 17 OR `c15` = 18)")
     Assertions.assertEquals(builder.build(AlwaysTrue.toV2), "1=1")
     Assertions.assertEquals(builder.build(AlwaysFalse.toV2), "1=0")
-    Assertions.assertNull(builder.build(In("c19", 
Array(19,20,21,22,23,24,25,26,27,28,29)).toV2))
+    Assertions.assertEquals(builder.build(In("c19", 
Array(19,20,21,22,23,24,25,26)).toV2), "`c19` IN (19,20,21,22,23,24,25,26)")
+    Assertions.assertEquals(builder.build(In("c19", Array("19","20")).toV2), 
"`c19` IN ('19','20')")
+    val inException = 
Assertions.assertThrows(classOf[IllegalArgumentException], () => 
builder.build(In("c19", Array(19,20,21,22,23,24,25,26,27,28,29)).toV2))
+    Assertions.assertEquals(inException.getMessage, "exceeding limit of IN 
values: actual size 12, limit size 10")
+    val exception = Assertions.assertThrows(classOf[IllegalArgumentException], 
() => builder.build(ExpressionUtil.buildCoalesceFilter()))
+    Assertions.assertEquals(exception.getMessage, "Unsupported expression: 
COALESCE")
 
   }
 
+  @Test
+  def buildOptTest() : Unit = {
+
+    val builder = new V2ExpressionBuilder(10)
+    Assertions.assertEquals(builder.buildOpt(EqualTo("c0", 1).toV2), 
Some("`c0` = 1"))
+    Assertions.assertEquals(builder.buildOpt(Not(EqualTo("c1", 2)).toV2), 
Some("`c1` != 2"))
+    Assertions.assertEquals(builder.buildOpt(GreaterThan("c2", 3.4).toV2), 
Some("`c2` > 3.4"))
+    
Assertions.assertEquals(builder.buildOpt(ExpressionUtil.buildCoalesceFilter()), 
None)
+  }
+
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/spark/sql/ExpressionUtil.scala
similarity index 52%
copy from 
spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
copy to 
spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/spark/sql/ExpressionUtil.scala
index 503ad04..2da931a 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/spark/sql/ExpressionUtil.scala
@@ -15,20 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.spark.read
+package org.apache.spark.sql
 
-import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
-import org.apache.doris.spark.read.expression.V2ExpressionBuilder
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
GeneralScalarExpression, LiteralValue}
+import org.apache.spark.sql.types.StringType
 
-class DorisScanV2(config: DorisConfig, schema: StructType, filters: 
Array[Predicate], limit: Int) extends AbstractDorisScan(config, schema) with 
Logging {
-  override protected def compiledFilters(): Array[String] = {
-    val inValueLengthLimit = 
config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
-    val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit)
-    filters.map(e => 
Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get)
-  }
+object ExpressionUtil {
 
-  override protected def getLimit: Int = limit
+  def buildCoalesceFilter(): Expression = {
+    val gse = new GeneralScalarExpression("COALESCE", 
Array(FieldReference(Seq("A4")), LiteralValue("null", StringType)))
+    new Predicate("=", Array(gse, LiteralValue("1", StringType)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to