This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a760509  [Improve] Fix column projection issue in Spark 3.3, 3.4, and 
3.5 (#353)
a760509 is described below

commit a7605093e39d8e28198f66761fa8cdfdf0a22d71
Author: wudi <[email protected]>
AuthorDate: Wed Mar 25 16:51:38 2026 +0800

    [Improve] Fix column projection issue in Spark 3.3, 3.4, and 3.5 (#353)
    
    * Add log for spark connector
    
    * Fix column projection issue in Spark 3.3, 3.4, and 3.5
---
 .../apache/doris/spark/client/DorisBackendThriftClient.java    |  2 +-
 .../apache/doris/spark/client/read/AbstractThriftReader.java   | 10 ++++++++++
 .../scala/org/apache/doris/spark/read/DorisScanBuilder.scala   |  2 +-
 .../scala/org/apache/doris/spark/read/DorisScanBuilder.scala   |  2 +-
 .../scala/org/apache/doris/spark/read/DorisScanBuilder.scala   |  2 +-
 5 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendThriftClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendThriftClient.java
index 9343d84..01a2a81 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendThriftClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendThriftClient.java
@@ -234,7 +234,7 @@ public class DorisBackendThriftClient {
                 logger.warn("Close scanner from {} failed.", backend, e);
             }
         }
-        logger.info("CloseScanner to Doris BE '{}' success.", backend);
+        logger.info("CloseScanner to Doris BE '{}' success or contextId {} .", 
backend, closeParams.getContextId());
         close();
     }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
index 373910c..76ea2a9 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
@@ -132,6 +132,11 @@ public abstract class AbstractThriftReader extends 
DorisReader {
                 offset += rowBatch.getReadRowCount();
                 rowBatch.close();
                 rowBatchQueue.put(rowBatch);
+            } else {
+                logger.info(
+                        "Async scan finished , tablets: {}, offset: {}",
+                        Arrays.toString(partition.getTablets()),
+                        offset);
             }
         }
     }
@@ -183,6 +188,11 @@ public abstract class AbstractThriftReader extends 
DorisReader {
                 endOfStream.set(nextResult.isEos());
                 if (!endOfStream.get()) {
                     rowBatch = new RowBatch(nextResult, dorisSchema, 
datetimeJava8ApiEnabled);
+                } else {
+                    logger.info(
+                            "Scan finished, tablets: {}, offset: {}",
+                            Arrays.toString(partition.getTablets()),
+                            offset);
                 }
             }
             hasNext = !endOfStream.get();
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
index 26b45f4..6bdd256 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
@@ -33,7 +33,7 @@ class DorisScanBuilder(config: DorisConfig, schema: 
StructType) extends DorisSca
 
   private var limitSize: Int = -1
 
-  override def build(): Scan = new DorisScanV2(config, schema, 
pushDownPredicates, limitSize)
+  override def build(): Scan = new DorisScanV2(config, readSchema, 
pushDownPredicates, limitSize)
 
   override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] 
= {
     val (pushed, unsupported) = predicates.partition(predicate => {
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
index 26b45f4..6bdd256 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
@@ -33,7 +33,7 @@ class DorisScanBuilder(config: DorisConfig, schema: 
StructType) extends DorisSca
 
   private var limitSize: Int = -1
 
-  override def build(): Scan = new DorisScanV2(config, schema, 
pushDownPredicates, limitSize)
+  override def build(): Scan = new DorisScanV2(config, readSchema, 
pushDownPredicates, limitSize)
 
   override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] 
= {
     val (pushed, unsupported) = predicates.partition(predicate => {
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
index 26b45f4..6bdd256 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
@@ -33,7 +33,7 @@ class DorisScanBuilder(config: DorisConfig, schema: 
StructType) extends DorisSca
 
   private var limitSize: Int = -1
 
-  override def build(): Scan = new DorisScanV2(config, schema, 
pushDownPredicates, limitSize)
+  override def build(): Scan = new DorisScanV2(config, readSchema, 
pushDownPredicates, limitSize)
 
   override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] 
= {
     val (pushed, unsupported) = predicates.partition(predicate => {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to