This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e60b5d  [feature](connector) support overwrite for datasource v2 
(#281)
7e60b5d is described below

commit 7e60b5d3e97c5e09751dcfe59acb3ca9008cae0d
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Mon Mar 17 20:36:49 2025 +0800

    [feature](connector) support overwrite for datasource v2 (#281)
---
 spark-doris-connector/pom.xml                          |  2 +-
 .../apache/doris/spark/client/DorisFrontendClient.java | 12 ++++++++++++
 .../apache/doris/spark/catalog/DorisTableBase.scala    |  3 ++-
 .../apache/doris/spark/write/DorisWriteBuilder.scala   | 18 +++++++++++++++---
 4 files changed, 30 insertions(+), 5 deletions(-)

diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 6d610da..8c975df 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -79,7 +79,7 @@
     </mailingLists>
 
     <properties>
-        <revision>25.0.0-SNAPSHOT</revision>
+        <revision>25.1.0-SNAPSHOT</revision>
         <spark.version>2.4.8</spark.version>
         <spark.major.version>2.4</spark.major.version>
         <scala.version>2.11.12</scala.version>
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index 0a6669e..1c4c49c 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -362,6 +362,18 @@ public class DorisFrontendClient implements Serializable {
         });
     }
 
+    public void truncateTable(String database, String table) throws Exception {
+        queryFrontends(conn -> {
+            String sql = "TRUNCATE TABLE " + database + "." + table;
+            try (PreparedStatement preparedStatement = 
conn.prepareStatement(sql)) {
+                preparedStatement.execute();
+                return null;
+            } catch (SQLException e) {
+                throw new RuntimeException("truncate table failed", e);
+            }
+        });
+    }
+
     public List<Frontend> getFrontends() {
         return frontends;
     }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
index 7f8fffd..50af0fa 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
@@ -48,7 +48,8 @@ abstract class DorisTableBase(identifier: Identifier, config: 
DorisConfig, schem
     Set(BATCH_READ,
       BATCH_WRITE,
       STREAMING_WRITE,
-      ACCEPT_ANY_SCHEMA).asJava
+      ACCEPT_ANY_SCHEMA,
+      TRUNCATE).asJava
   }
 
   override def newScanBuilder(caseInsensitiveStringMap: 
CaseInsensitiveStringMap): ScanBuilder = {
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala
index 521d06c..b74cc45 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala
@@ -17,14 +17,22 @@
 
 package org.apache.doris.spark.write
 
-import org.apache.doris.spark.config.DorisConfig
+import org.apache.doris.spark.client.DorisFrontendClient
+import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
 import org.apache.spark.sql.connector.write.streaming.StreamingWrite
-import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
+import org.apache.spark.sql.connector.write.{BatchWrite, SupportsTruncate, 
WriteBuilder}
 import org.apache.spark.sql.types.StructType
 
-class DorisWriteBuilder(config: DorisConfig, schema: StructType) extends 
WriteBuilder {
+class DorisWriteBuilder(config: DorisConfig, schema: StructType) extends 
WriteBuilder with SupportsTruncate {
+
+  private var isTruncate = false
 
   override def buildForBatch(): BatchWrite = {
+    if (isTruncate) {
+      val client = new DorisFrontendClient(config)
+      val tableDb = 
config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER).split("\\.")
+      client.truncateTable(tableDb(0), tableDb(1))
+    }
     new DorisWrite(config, schema)
   }
 
@@ -32,4 +40,8 @@ class DorisWriteBuilder(config: DorisConfig, schema: 
StructType) extends WriteBu
     new DorisWrite(config, schema)
   }
 
+  override def truncate(): WriteBuilder = {
+    isTruncate = true
+    this
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to