This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7e60b5d [feature](connector) support overwrite for datasource v2 (#281) 7e60b5d is described below commit 7e60b5d3e97c5e09751dcfe59acb3ca9008cae0d Author: gnehil <adamlee...@gmail.com> AuthorDate: Mon Mar 17 20:36:49 2025 +0800 [feature](connector) support overwrite for datasource v2 (#281) --- spark-doris-connector/pom.xml | 2 +- .../apache/doris/spark/client/DorisFrontendClient.java | 12 ++++++++++++ .../apache/doris/spark/catalog/DorisTableBase.scala | 3 ++- .../apache/doris/spark/write/DorisWriteBuilder.scala | 18 +++++++++++++++--- 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 6d610da..8c975df 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -79,7 +79,7 @@ </mailingLists> <properties> - <revision>25.0.0-SNAPSHOT</revision> + <revision>25.1.0-SNAPSHOT</revision> <spark.version>2.4.8</spark.version> <spark.major.version>2.4</spark.major.version> <scala.version>2.11.12</scala.version> diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java index 0a6669e..1c4c49c 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java @@ -362,6 +362,18 @@ public class DorisFrontendClient implements Serializable { }); } + public void truncateTable(String database, String table) throws Exception { + queryFrontends(conn -> { + String sql = "TRUNCATE TABLE " + database + "." + table; + try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { + preparedStatement.execute(); + return null; + } catch (SQLException e) { + throw new RuntimeException("truncate table failed", e); + } + }); + } + public List<Frontend> getFrontends() { return frontends; } diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala index 7f8fffd..50af0fa 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala @@ -48,7 +48,8 @@ abstract class DorisTableBase(identifier: Identifier, config: DorisConfig, schem Set(BATCH_READ, BATCH_WRITE, STREAMING_WRITE, - ACCEPT_ANY_SCHEMA).asJava + ACCEPT_ANY_SCHEMA, + TRUNCATE).asJava } override def newScanBuilder(caseInsensitiveStringMap: CaseInsensitiveStringMap): ScanBuilder = { diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala index 521d06c..b74cc45 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala @@ -17,14 +17,22 @@ package org.apache.doris.spark.write -import org.apache.doris.spark.config.DorisConfig +import org.apache.doris.spark.client.DorisFrontendClient +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} import org.apache.spark.sql.connector.write.streaming.StreamingWrite -import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder} +import org.apache.spark.sql.connector.write.{BatchWrite, SupportsTruncate, WriteBuilder} import org.apache.spark.sql.types.StructType -class DorisWriteBuilder(config: DorisConfig, schema: StructType) extends WriteBuilder { +class DorisWriteBuilder(config: DorisConfig, schema: StructType) extends WriteBuilder with SupportsTruncate { + + private var isTruncate = false override def buildForBatch(): BatchWrite = { + if (isTruncate) { + val client = new DorisFrontendClient(config) + val tableDb = config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER).split("\\.") + client.truncateTable(tableDb(0), tableDb(1)) + } new DorisWrite(config, schema) } @@ -32,4 +40,8 @@ class DorisWriteBuilder(config: DorisConfig, schema: StructType) extends WriteBu new DorisWrite(config, schema) } + override def truncate(): WriteBuilder = { + isTruncate = true + this + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org