This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 13f2aa9  [fix](connector) fix performance degradation caused by 
interval mistake (#294)
13f2aa9 is described below

commit 13f2aa9b57a1dc0d9317a37f0b1aad087adb6b2a
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Fri Mar 21 14:09:41 2025 +0800

    [fix](connector) fix performance degradation caused by interval mistake 
(#294)
---
 .../client/write/AbstractCopyIntoProcessor.java    |  3 +-
 .../client/write/AbstractStreamLoadProcessor.java  |  6 +-
 .../doris/spark/client/write/DorisWriter.java      | 34 ++++++++++--
 .../doris/spark/sql/DorisRowFlightSqlReader.scala  |  4 +-
 .../doris/spark/sql/DorisRowThriftReader.scala     |  4 +-
 .../scala/org/apache/doris/spark/util/Retry.scala  |  3 +-
 .../apache/doris/spark/write/DorisDataWriter.scala | 64 +++++++++++-----------
 7 files changed, 76 insertions(+), 42 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java
index 6cf3549..28f7258 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public abstract class AbstractCopyIntoProcessor<R> implements DorisWriter<R>, 
DorisCommitter {
+public abstract class AbstractCopyIntoProcessor<R> extends DorisWriter<R> 
implements DorisCommitter {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger("CopyIntoProcessor");
 
@@ -97,6 +97,7 @@ public abstract class AbstractCopyIntoProcessor<R> implements 
DorisWriter<R>, Do
     private boolean isNewBatch = true;
 
     public AbstractCopyIntoProcessor(DorisConfig config) throws Exception {
+        super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE));
         this.config = config;
         this.frontend = new DorisFrontendClient(config);
         this.properties = config.getSinkProperties();
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 9bc3037..2a10ffa 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-public abstract class AbstractStreamLoadProcessor<R> implements 
DorisWriter<R>, DorisCommitter {
+public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> 
implements DorisCommitter {
 
     protected final Logger logger = 
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
 
@@ -112,6 +112,7 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
     private Future<CloseableHttpResponse> requestFuture = null;
 
     public AbstractStreamLoadProcessor(DorisConfig config) throws Exception {
+        super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE));
         this.config = config;
         String tableIdentifier = 
config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER);
         String[] dbTableArr = tableIdentifier.split("\\.");
@@ -151,11 +152,13 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
             createNewBatch = false;
         }
         output.write(toFormat(row, format));
+        currentBatchCount++;
     }
 
     @Override
     public String stop() throws Exception {
         if (requestFuture != null) {
+            createNewBatch = true;
             // arrow format need to send all buffer data before stop
             if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) {
                 List<R> rs = new LinkedList<>(recordBuffer);
@@ -172,7 +175,6 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
             logger.info("stream load response: {}", resEntity);
             StreamLoadResponse response = MAPPER.readValue(resEntity, 
StreamLoadResponse.class);
             if (response != null && response.isSuccess()) {
-                createNewBatch = true;
                 return isTwoPhaseCommitEnabled ? 
String.valueOf(response.getTxnId()) : null;
             } else {
                 throw new StreamLoadException("stream load execute failed, 
response: " + resEntity);
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
index 281a9b8..ada89c7 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
@@ -17,14 +17,40 @@
 
 package org.apache.doris.spark.client.write;
 
+import org.apache.doris.spark.config.DorisOptions;
+
 import java.io.IOException;
 import java.io.Serializable;
 
-public interface DorisWriter<R> extends Serializable {
+public abstract class DorisWriter<R> implements Serializable {
+
+    protected int batchSize;
+
+    protected int currentBatchCount = 0;
+
+    public DorisWriter(int batchSize) {
+        if (batchSize <= 0) {
+            throw new 
IllegalArgumentException(DorisOptions.DORIS_SINK_BATCH_SIZE.getName() + " must 
be greater than 0");
+        }
+        this.batchSize = batchSize;
+    }
+
+    public abstract void load(R row) throws Exception;
+
+    public abstract String stop() throws Exception;
+
+    public abstract void close() throws IOException;
+
+    public boolean endOfBatch() {
+        return currentBatchCount >= batchSize;
+    }
 
-    void load(R row) throws Exception;
+    public int getBatchCount() {
+        return currentBatchCount;
+    }
 
-    String stop() throws Exception;
+    public void resetBatchCount() {
+        currentBatchCount = 0;
+    }
 
-    void close() throws IOException;
 }
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
index 3b5baed..03796c6 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
@@ -22,7 +22,7 @@ import org.apache.doris.spark.client.read.DorisFlightSqlReader
 import org.apache.doris.spark.config.DorisOptions
 import org.apache.doris.spark.exception.ShouldNeverHappenException
 
-import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.collection.JavaConverters.{asScalaBufferConverter, 
mapAsScalaMapConverter}
 
 class DorisRowFlightSqlReader(partition: DorisReaderPartition) extends 
DorisFlightSqlReader(partition)  {
 
@@ -34,6 +34,8 @@ class DorisRowFlightSqlReader(partition: 
DorisReaderPartition) extends DorisFlig
     }
     val row: DorisRow = new DorisRow(rowOrder)
     rowBatch.next.asScala.zipWithIndex.foreach {
+      case (s, index) if index < row.values.size && 
s.isInstanceOf[java.util.HashMap[String, String]] =>
+        row.values.update(index, s.asInstanceOf[java.util.HashMap[String, 
String]].asScala)
       case (s, index) if index < row.values.size => row.values.update(index, s)
       case _ => // nothing
     }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
index 07236b9..9c3cefa 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
@@ -22,7 +22,7 @@ import org.apache.doris.spark.client.read.DorisThriftReader
 import org.apache.doris.spark.config.DorisOptions
 import org.apache.doris.spark.exception.ShouldNeverHappenException
 
-import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.collection.JavaConverters.{asScalaBufferConverter, 
mapAsScalaMapConverter}
 
 class DorisRowThriftReader(partition: DorisReaderPartition) extends 
DorisThriftReader(partition) {
 
@@ -34,6 +34,8 @@ class DorisRowThriftReader(partition: DorisReaderPartition) 
extends DorisThriftR
     }
     val row: DorisRow = new DorisRow(rowOrder)
     rowBatch.next.asScala.zipWithIndex.foreach {
+      case (s, index) if index < row.values.size && 
s.isInstanceOf[java.util.HashMap[String, String]] =>
+        row.values.update(index, s.asInstanceOf[java.util.HashMap[String, 
String]].asScala)
       case (s, index) if index < row.values.size => row.values.update(index, s)
       case _ => // nothing
     }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala
index c41be51..24f8c13 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala
@@ -34,10 +34,9 @@ object Retry {
     val result = Try(f)
     result match {
       case Success(result) =>
-        LockSupport.parkNanos(interval.toNanos)
         Success(result)
       case Failure(exception: T) if retryTimes > 0 =>
-        logger.warn("Execution failed caused by: ", exception)
+        logger.warn("Execution failed caused by: {}", exception.getMessage)
         logger.warn(s"$retryTimes times retry remaining, the next attempt will 
be in ${interval.toMillis} ms")
         LockSupport.parkNanos(interval.toNanos)
         h
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index 03c8617..f4ff49f 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -27,11 +27,14 @@ import org.apache.spark.sql.connector.write.{DataWriter, 
WriterCommitMessage}
 import org.apache.spark.sql.types.StructType
 
 import java.time.Duration
+import java.util.concurrent.locks.LockSupport
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Random, Success}
 
 class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: 
Int, taskId: Long, epochId: Long = -1) extends DataWriter[InternalRow] with 
Logging {
 
+  private val batchSize = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)
+
   private val (writer: DorisWriter[InternalRow], committer: DorisCommitter) =
     config.getValue(DorisOptions.LOAD_MODE) match {
       case "stream_load" => (new StreamLoadProcessor(config, schema), new 
StreamLoadProcessor(config, schema))
@@ -39,43 +42,25 @@ class DorisDataWriter(config: DorisConfig, schema: 
StructType, partitionId: Int,
       case mode => throw new IllegalArgumentException("Unsupported load mode: 
" + mode)
     }
 
-  private val batchSize = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)
-
   private val batchIntervalMs = 
config.getValue(DorisOptions.DORIS_SINK_BATCH_INTERVAL_MS)
 
   private val retries = config.getValue(DorisOptions.DORIS_SINK_MAX_RETRIES)
 
   private val twoPhaseCommitEnabled = 
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC)
 
-  private var currentBatchCount = 0
-
   private val committedMessages = mutable.Buffer[String]()
 
   private lazy val recordBuffer = mutable.Buffer[InternalRow]()
 
-  override def write(record: InternalRow): Unit = {
-    if (currentBatchCount >= batchSize) {
-      val txnId = Some(writer.stop())
-      if (txnId.isDefined) {
-        committedMessages += txnId.get
-        currentBatchCount = 0
-        if (retries != 0) {
-          recordBuffer.clear()
-        }
-      } else {
-        throw new Exception()
-      }
-    }
-    loadWithRetries(record)
-  }
+  override def write(record: InternalRow): Unit = loadBatchWithRetries(record)
 
   override def commit(): WriterCommitMessage = {
-    val txnId = writer.stop()
+    val txnId = Option(writer.stop())
     if (twoPhaseCommitEnabled) {
-      if (StringUtils.isNotBlank(txnId)) {
-        committedMessages += txnId
+      if (txnId.isDefined) {
+        committedMessages += txnId.get
       } else {
-        throw new Exception()
+        throw new Exception("Failed to commit batch")
       }
     }
     DorisWriterCommitMessage(partitionId, taskId, epochId, 
committedMessages.toArray)
@@ -95,26 +80,43 @@ class DorisDataWriter(config: DorisConfig, schema: 
StructType, partitionId: Int,
   }
 
   @throws[Exception]
-  private def loadWithRetries(record: InternalRow): Unit = {
+  private def loadBatchWithRetries(record: InternalRow): Unit = {
     var isRetrying = false
     Retry.exec[Unit, Exception](retries, 
Duration.ofMillis(batchIntervalMs.toLong), log) {
       if (isRetrying) {
+        // retrying, reload data from buffer
         do {
-          writer.load(recordBuffer(currentBatchCount))
-          currentBatchCount += 1
-        } while (currentBatchCount < recordBuffer.size)
+          val idx = writer.getBatchCount
+          writer.load(recordBuffer(idx))
+        } while (writer.getBatchCount < recordBuffer.size)
         isRetrying = false
       }
+      if (writer.endOfBatch()) {
+        // end of batch, stop batch write
+        val txnId = Option(writer.stop())
+        if (twoPhaseCommitEnabled) {
+          if (txnId.isDefined) {
+            committedMessages += txnId.get
+          } else {
+            throw new Exception("Failed to end batch write")
+          }
+        }
+        // clear buffer if retry is enabled
+        if (retries > 0) {
+          recordBuffer.clear()
+        }
+        writer.resetBatchCount()
+        LockSupport.parkNanos(batchIntervalMs.toLong)
+      }
       writer.load(record)
-      currentBatchCount += 1
     } {
+      // batch write failed, set retry flag and reset batch count
       isRetrying = true
-      currentBatchCount = 0
+      writer.resetBatchCount()
     } match {
       case Success(_) => if (retries > 0) recordBuffer += record
       case Failure(exception) => throw new Exception(exception)
     }
-
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to