This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 9db6115 [Fix] fix retry read buffer problem (#160) 9db6115 is described below commit 9db61150189fef57d89e8ba9259f9ab3541bd643 Author: gnehil <adamlee...@gmail.com> AuthorDate: Mon Nov 20 14:43:56 2023 +0800 [Fix] fix retry read buffer problem (#160) --- .../apache/doris/spark/writer/DorisWriter.scala | 28 ++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index 59092f6..0485671 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -155,12 +155,10 @@ class DorisWriter(settings: SparkSettings, preCommittedTxnAcc: CollectionAccumul override def next(): T = { recordCount += 1 if (batchRetryEnable) { - if (isReset && buffer.nonEmpty) { - buffer(recordCount) + if (isReset) { + readBuffer() } else { - val elem = iterator.next - buffer += elem - elem + writeBufferAndReturn() } } else { iterator.next @@ -172,8 +170,10 @@ class DorisWriter(settings: SparkSettings, preCommittedTxnAcc: CollectionAccumul */ def reset(): Unit = { recordCount = 0 - isReset = true - logger.info("batch iterator is reset") + isReset = buffer.nonEmpty + if (isReset) { + logger.info("buffer is not empty and batch iterator is reset") + } } /** @@ -186,6 +186,20 @@ class DorisWriter(settings: SparkSettings, preCommittedTxnAcc: CollectionAccumul } } + private def readBuffer(): T = { + if (recordCount == buffer.size) { + logger.debug("read buffer end, recordCount:{}, bufferSize: {}", recordCount, buffer.size) + isReset = false + } + buffer(recordCount - 1) + } + + private def writeBufferAndReturn(): T = { + val elem = iterator.next + buffer += elem + elem + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org