This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e49a62b  [fix](connector) fix npe caused by prune columns  (#273)
e49a62b is described below

commit e49a62b5d36c1e79bfcf4302712d786ffc61d136
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Wed Feb 26 15:46:02 2025 +0800

    [fix](connector) fix npe caused by prune columns  (#273)
---
 .../doris/spark/read/DorisScanBuilderBase.scala    | 27 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
index cec9890..55dbb2c 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
@@ -18,18 +18,35 @@
 package org.apache.doris.spark.read
 
 import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
-import org.apache.doris.spark.util.DorisDialects
-import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns}
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.connector.read.{ScanBuilder, 
SupportsPushDownRequiredColumns}
 import org.apache.spark.sql.types.StructType
 
 protected[spark] abstract class DorisScanBuilderBase(config: DorisConfig, 
schema: StructType) extends ScanBuilder
   with SupportsPushDownRequiredColumns {
 
-  protected var readSchema: StructType = schema
+  protected var readSchema: StructType = {
+    if (config.contains(DorisOptions.DORIS_READ_FIELDS)) {
+      val dorisReadFields = 
config.getValue(DorisOptions.DORIS_READ_FIELDS).split(",").map(_.trim.replaceAll("`",
 ""))
+      doPruneColumns(schema, dorisReadFields)
+    } else {
+      schema
+    }
+  }
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-    readSchema = StructType(requiredSchema.fields.filter(schema.contains(_)))
+    doPruneColumns(readSchema, requiredSchema.fieldNames)
+  }
+
+  private def doPruneColumns(originSchema: StructType, requiredCols: 
Array[String]): StructType = {
+    if (requiredCols.nonEmpty) {
+      val fields = originSchema.fields.filter(
+        field => requiredCols.contains(field.name)
+      )
+      if (fields.isEmpty) {
+        throw new IllegalArgumentException("No required columns found")
+      }
+      StructType(fields)
+    } else originSchema
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to