This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 13f2aa9 [fix](connector) fix performance degradation caused by interval mistake (#294) 13f2aa9 is described below commit 13f2aa9b57a1dc0d9317a37f0b1aad087adb6b2a Author: gnehil <adamlee...@gmail.com> AuthorDate: Fri Mar 21 14:09:41 2025 +0800 [fix](connector) fix performance degradation caused by interval mistake (#294) --- .../client/write/AbstractCopyIntoProcessor.java | 3 +- .../client/write/AbstractStreamLoadProcessor.java | 6 +- .../doris/spark/client/write/DorisWriter.java | 34 ++++++++++-- .../doris/spark/sql/DorisRowFlightSqlReader.scala | 4 +- .../doris/spark/sql/DorisRowThriftReader.scala | 4 +- .../scala/org/apache/doris/spark/util/Retry.scala | 3 +- .../apache/doris/spark/write/DorisDataWriter.scala | 64 +++++++++++----------- 7 files changed, 76 insertions(+), 42 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java index 6cf3549..28f7258 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java @@ -59,7 +59,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -public abstract class AbstractCopyIntoProcessor<R> implements DorisWriter<R>, DorisCommitter { +public abstract class AbstractCopyIntoProcessor<R> extends DorisWriter<R> implements DorisCommitter { protected static final Logger LOG = LoggerFactory.getLogger("CopyIntoProcessor"); @@ -97,6 +97,7 @@ public abstract class AbstractCopyIntoProcessor<R> implements DorisWriter<R>, Do private boolean isNewBatch = true; public AbstractCopyIntoProcessor(DorisConfig config) throws Exception { + super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)); this.config = config; this.frontend = new DorisFrontendClient(config); this.properties = config.getSinkProperties(); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java index 9bc3037..2a10ffa 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java @@ -59,7 +59,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, DorisCommitter { +public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> implements DorisCommitter { protected final Logger logger = LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", "")); @@ -112,6 +112,7 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, private Future<CloseableHttpResponse> requestFuture = null; public AbstractStreamLoadProcessor(DorisConfig config) throws Exception { + super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)); this.config = config; String tableIdentifier = config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER); String[] dbTableArr = tableIdentifier.split("\\."); @@ -151,11 +152,13 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, createNewBatch = false; } output.write(toFormat(row, format)); + currentBatchCount++; } @Override public String stop() throws Exception { if (requestFuture != null) { + createNewBatch = true; // arrow format need to send all buffer data before stop if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) { List<R> rs = new LinkedList<>(recordBuffer); @@ -172,7 +175,6 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, logger.info("stream load response: {}", resEntity); StreamLoadResponse response = MAPPER.readValue(resEntity, StreamLoadResponse.class); if (response != null && response.isSuccess()) { - createNewBatch = true; return isTwoPhaseCommitEnabled ? String.valueOf(response.getTxnId()) : null; } else { throw new StreamLoadException("stream load execute failed, response: " + resEntity); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java index 281a9b8..ada89c7 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java @@ -17,14 +17,40 @@ package org.apache.doris.spark.client.write; +import org.apache.doris.spark.config.DorisOptions; + import java.io.IOException; import java.io.Serializable; -public interface DorisWriter<R> extends Serializable { +public abstract class DorisWriter<R> implements Serializable { + + protected int batchSize; + + protected int currentBatchCount = 0; + + public DorisWriter(int batchSize) { + if (batchSize <= 0) { + throw new IllegalArgumentException(DorisOptions.DORIS_SINK_BATCH_SIZE.getName() + " must be greater than 0"); + } + this.batchSize = batchSize; + } + + public abstract void load(R row) throws Exception; + + public abstract String stop() throws Exception; + + public abstract void close() throws IOException; + + public boolean endOfBatch() { + return currentBatchCount >= batchSize; + } - void load(R row) throws Exception; + public int getBatchCount() { + return currentBatchCount; + } - String stop() throws Exception; + public void resetBatchCount() { + currentBatchCount = 0; + } - void close() throws IOException; } \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala index 3b5baed..03796c6 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala @@ -22,7 +22,7 @@ import org.apache.doris.spark.client.read.DorisFlightSqlReader import org.apache.doris.spark.config.DorisOptions import org.apache.doris.spark.exception.ShouldNeverHappenException -import scala.collection.JavaConverters.asScalaBufferConverter +import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMapConverter} class DorisRowFlightSqlReader(partition: DorisReaderPartition) extends DorisFlightSqlReader(partition) { @@ -34,6 +34,8 @@ class DorisRowFlightSqlReader(partition: DorisReaderPartition) extends DorisFlig } val row: DorisRow = new DorisRow(rowOrder) rowBatch.next.asScala.zipWithIndex.foreach { + case (s, index) if index < row.values.size && s.isInstanceOf[java.util.HashMap[String, String]] => + row.values.update(index, s.asInstanceOf[java.util.HashMap[String, String]].asScala) case (s, index) if index < row.values.size => row.values.update(index, s) case _ => // nothing } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala index 07236b9..9c3cefa 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala @@ -22,7 +22,7 @@ import org.apache.doris.spark.client.read.DorisThriftReader import org.apache.doris.spark.config.DorisOptions import org.apache.doris.spark.exception.ShouldNeverHappenException -import scala.collection.JavaConverters.asScalaBufferConverter +import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMapConverter} class DorisRowThriftReader(partition: DorisReaderPartition) extends DorisThriftReader(partition) { @@ -34,6 +34,8 @@ class DorisRowThriftReader(partition: DorisReaderPartition) extends DorisThriftR } val row: DorisRow = new DorisRow(rowOrder) rowBatch.next.asScala.zipWithIndex.foreach { + case (s, index) if index < row.values.size && s.isInstanceOf[java.util.HashMap[String, String]] => + row.values.update(index, s.asInstanceOf[java.util.HashMap[String, String]].asScala) case (s, index) if index < row.values.size => row.values.update(index, s) case _ => // nothing } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala index c41be51..24f8c13 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala @@ -34,10 +34,9 @@ object Retry { val result = Try(f) result match { case Success(result) => - LockSupport.parkNanos(interval.toNanos) Success(result) case Failure(exception: T) if retryTimes > 0 => - logger.warn("Execution failed caused by: ", exception) + logger.warn("Execution failed caused by: {}", exception.getMessage) logger.warn(s"$retryTimes times retry remaining, the next attempt will be in ${interval.toMillis} ms") LockSupport.parkNanos(interval.toNanos) h diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala index 03c8617..f4ff49f 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala @@ -27,11 +27,14 @@ import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage} import org.apache.spark.sql.types.StructType import java.time.Duration +import java.util.concurrent.locks.LockSupport import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Random, Success} class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: Int, taskId: Long, epochId: Long = -1) extends DataWriter[InternalRow] with Logging { + private val batchSize = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE) + private val (writer: DorisWriter[InternalRow], committer: DorisCommitter) = config.getValue(DorisOptions.LOAD_MODE) match { case "stream_load" => (new StreamLoadProcessor(config, schema), new StreamLoadProcessor(config, schema)) @@ -39,43 +42,25 @@ class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: Int, case mode => throw new IllegalArgumentException("Unsupported load mode: " + mode) } - private val batchSize = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE) - private val batchIntervalMs = config.getValue(DorisOptions.DORIS_SINK_BATCH_INTERVAL_MS) private val retries = config.getValue(DorisOptions.DORIS_SINK_MAX_RETRIES) private val twoPhaseCommitEnabled = config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC) - private var currentBatchCount = 0 - private val committedMessages = mutable.Buffer[String]() private lazy val recordBuffer = mutable.Buffer[InternalRow]() - override def write(record: InternalRow): Unit = { - if (currentBatchCount >= batchSize) { - val txnId = Some(writer.stop()) - if (txnId.isDefined) { - committedMessages += txnId.get - currentBatchCount = 0 - if (retries != 0) { - recordBuffer.clear() - } - } else { - throw new Exception() - } - } - loadWithRetries(record) - } + override def write(record: InternalRow): Unit = loadBatchWithRetries(record) override def commit(): WriterCommitMessage = { - val txnId = writer.stop() + val txnId = Option(writer.stop()) if (twoPhaseCommitEnabled) { - if (StringUtils.isNotBlank(txnId)) { - committedMessages += txnId + if (txnId.isDefined) { + committedMessages += txnId.get } else { - throw new Exception() + throw new Exception("Failed to commit batch") } } DorisWriterCommitMessage(partitionId, taskId, epochId, committedMessages.toArray) @@ -95,26 +80,43 @@ class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: Int, } @throws[Exception] - private def loadWithRetries(record: InternalRow): Unit = { + private def loadBatchWithRetries(record: InternalRow): Unit = { var isRetrying = false Retry.exec[Unit, Exception](retries, Duration.ofMillis(batchIntervalMs.toLong), log) { if (isRetrying) { + // retrying, reload data from buffer do { - writer.load(recordBuffer(currentBatchCount)) - currentBatchCount += 1 - } while (currentBatchCount < recordBuffer.size) + val idx = writer.getBatchCount + writer.load(recordBuffer(idx)) + } while (writer.getBatchCount < recordBuffer.size) isRetrying = false } + if (writer.endOfBatch()) { + // end of batch, stop batch write + val txnId = Option(writer.stop()) + if (twoPhaseCommitEnabled) { + if (txnId.isDefined) { + committedMessages += txnId.get + } else { + throw new Exception("Failed to end batch write") + } + } + // clear buffer if retry is enabled + if (retries > 0) { + recordBuffer.clear() + } + writer.resetBatchCount() + LockSupport.parkNanos(batchIntervalMs.toLong) + } writer.load(record) - currentBatchCount += 1 } { + // batch write failed, set retry flag and reset batch count isRetrying = true - currentBatchCount = 0 + writer.resetBatchCount() } match { case Success(_) => if (retries > 0) recordBuffer += record case Failure(exception) => throw new Exception(exception) } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org