This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new e49a62b [fix](connector) fix npe caused by prune columns (#273) e49a62b is described below commit e49a62b5d36c1e79bfcf4302712d786ffc61d136 Author: gnehil <adamlee...@gmail.com> AuthorDate: Wed Feb 26 15:46:02 2025 +0800 [fix](connector) fix npe caused by prune columns (#273) --- .../doris/spark/read/DorisScanBuilderBase.scala | 27 ++++++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala index cec9890..55dbb2c 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala @@ -18,18 +18,35 @@ package org.apache.doris.spark.read import org.apache.doris.spark.config.{DorisConfig, DorisOptions} -import org.apache.doris.spark.util.DorisDialects -import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownRequiredColumns} import org.apache.spark.sql.types.StructType protected[spark] abstract class DorisScanBuilderBase(config: DorisConfig, schema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { - protected var readSchema: StructType = schema + protected var readSchema: StructType = { + if (config.contains(DorisOptions.DORIS_READ_FIELDS)) { + val dorisReadFields = config.getValue(DorisOptions.DORIS_READ_FIELDS).split(",").map(_.trim.replaceAll("`", "")) + doPruneColumns(schema, dorisReadFields) + } else { + schema + } + } override def pruneColumns(requiredSchema: StructType): Unit = { - readSchema = StructType(requiredSchema.fields.filter(schema.contains(_))) + doPruneColumns(readSchema, requiredSchema.fieldNames) + } + + private def doPruneColumns(originSchema: StructType, requiredCols: Array[String]): StructType = { + if (requiredCols.nonEmpty) { + val fields = originSchema.fields.filter( + field => requiredCols.contains(field.name) + ) + if (fields.isEmpty) { + throw new IllegalArgumentException("No required columns found") + } + StructType(fields) + } else originSchema } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org