This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 411020ab [Improve] Fix the problem caused by very small interval in 
batch mode (#462)
411020ab is described below

commit 411020ab8e945613ce316dfb895254319d96fbdc
Author: wudi <676366...@qq.com>
AuthorDate: Thu Aug 15 14:28:08 2024 +0800

    [Improve] Fix the problem caused by very small interval in batch mode (#462)
---
 .../doris/flink/cfg/DorisExecutionOptions.java     |  20 +-
 .../flink/sink/batch/BatchBufferHttpEntity.java    |  76 ++++++++
 .../doris/flink/sink/batch/BatchBufferStream.java  |  73 ++++++++
 .../doris/flink/sink/batch/BatchRecordBuffer.java  |  97 ++++------
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 208 ++++++++++++++++++---
 .../doris/flink/sink/batch/DorisBatchWriter.java   |   8 +-
 .../sink/{batch => copy}/BatchRecordBuffer.java    |   2 +-
 .../doris/flink/sink/copy/BatchStageLoad.java      |   1 -
 .../doris/flink/table/DorisConfigOptions.java      |   4 +-
 .../doris/flink/cfg/DorisExecutionOptionsTest.java |  16 +-
 .../apache/doris/flink/sink/DorisSinkITCase.java   |  21 ++-
 .../sink/batch/TestBatchBufferHttpEntity.java      |  46 +++++
 .../flink/sink/batch/TestBatchBufferStream.java    |  95 ++++++++++
 .../flink/sink/batch/TestDorisBatchStreamLoad.java |  80 +++++++-
 .../{batch => copy}/TestBatchRecordBuffer.java     |   2 +-
 .../flink/table/DorisDynamicTableFactoryTest.java  |   4 +-
 16 files changed, 631 insertions(+), 122 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 8f3cc240..7ad8ba97 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -40,8 +40,8 @@ public class DorisExecutionOptions implements Serializable {
     private static final int DEFAULT_BUFFER_COUNT = 3;
     // batch flush
     private static final int DEFAULT_FLUSH_QUEUE_SIZE = 2;
-    private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 50000;
-    private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 10 * 1024 * 1024;
+    private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 500000;
+    private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 100 * 1024 * 
1024;
     private static final long DEFAULT_BUFFER_FLUSH_INTERVAL_MS = 10 * 1000;
     private final int checkInterval;
     private final int maxRetries;
@@ -358,9 +358,6 @@ public class DorisExecutionOptions implements Serializable {
         }
 
         public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
-            Preconditions.checkState(
-                    bufferFlushIntervalMs >= 1000,
-                    "bufferFlushIntervalMs must be greater than or equal to 1 
second");
             this.bufferFlushIntervalMs = bufferFlushIntervalMs;
             return this;
         }
@@ -397,6 +394,19 @@ public class DorisExecutionOptions implements Serializable 
{
                     && JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))) {
                 streamLoadProp.put(READ_JSON_BY_LINE, true);
             }
+
+            Preconditions.checkArgument(
+                    bufferFlushIntervalMs >= 1000,
+                    "bufferFlushIntervalMs must be greater than or equal to 1 
second");
+
+            Preconditions.checkArgument(
+                    bufferFlushMaxRows >= 10000,
+                    "bufferFlushMaxRows must be greater than or equal to 
10000");
+
+            Preconditions.checkArgument(
+                    bufferFlushMaxBytes >= 10485760,
+                    "bufferFlushMaxBytes must be greater than or equal to 
10485760(10MB)");
+
             return new DorisExecutionOptions(
                     checkInterval,
                     maxRetries,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java
new file mode 100644
index 00000000..3c0068eb
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.apache.http.entity.AbstractHttpEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+public class BatchBufferHttpEntity extends AbstractHttpEntity {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchBufferHttpEntity.class);
+    protected static final int OUTPUT_BUFFER_SIZE = 4096;
+    private final List<byte[]> buffer;
+    private final long contentLength;
+
+    public BatchBufferHttpEntity(BatchRecordBuffer recordBuffer) {
+        this.buffer = recordBuffer.getBuffer();
+        this.contentLength = recordBuffer.getBufferSizeBytes();
+    }
+
+    @Override
+    public boolean isRepeatable() {
+        return true;
+    }
+
+    @Override
+    public boolean isChunked() {
+        return false;
+    }
+
+    @Override
+    public long getContentLength() {
+        return contentLength;
+    }
+
+    @Override
+    public InputStream getContent() {
+        return new BatchBufferStream(buffer);
+    }
+
+    @Override
+    public void writeTo(OutputStream outStream) throws IOException {
+        try (InputStream inStream = new BatchBufferStream(buffer)) {
+            final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];
+            int readLen;
+            while ((readLen = inStream.read(buffer)) != -1) {
+                outStream.write(buffer, 0, readLen);
+            }
+        }
+    }
+
+    @Override
+    public boolean isStreaming() {
+        return false;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java
new file mode 100644
index 00000000..a782bb53
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+public class BatchBufferStream extends InputStream {
+    private final Iterator<byte[]> iterator;
+    private byte[] currentRow;
+    private int currentPos;
+
+    public BatchBufferStream(List<byte[]> buffer) {
+        this.iterator = buffer.iterator();
+    }
+
+    @Override
+    public int read() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public int read(byte[] buf) throws IOException {
+        return read(buf, 0, buf.length);
+    }
+
+    @Override
+    public int read(byte[] buf, int off, int len) throws IOException {
+        if (!iterator.hasNext() && currentRow == null) {
+            return -1;
+        }
+
+        byte[] item = currentRow;
+        int pos = currentPos;
+        int readBytes = 0;
+        while (readBytes < len && (item != null || iterator.hasNext())) {
+            if (item == null) {
+                item = iterator.next();
+                pos = 0;
+            }
+
+            int size = Math.min(len - readBytes, item.length - pos);
+            System.arraycopy(item, pos, buf, off + readBytes, size);
+            readBytes += size;
+            pos += size;
+
+            if (pos == item.length) {
+                item = null;
+                pos = 0;
+            }
+        }
+        currentRow = item;
+        currentPos = pos;
+        return readBytes;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
index df40e7a9..8eb98037 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -17,80 +17,52 @@
 
 package org.apache.doris.flink.sink.batch;
 
-import org.apache.flink.annotation.VisibleForTesting;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
+import java.util.LinkedList;
 
 /** buffer to queue. */
 public class BatchRecordBuffer {
     private static final Logger LOG = 
LoggerFactory.getLogger(BatchRecordBuffer.class);
     public static final String LINE_SEPARATOR = "\n";
     private String labelName;
-    private ByteBuffer buffer;
+    private LinkedList<byte[]> buffer;
     private byte[] lineDelimiter;
     private int numOfRecords = 0;
-    private int bufferSizeBytes = 0;
+    private long bufferSizeBytes = 0;
     private boolean loadBatchFirstRecord = true;
     private String database;
     private String table;
+    private final long createTime = System.currentTimeMillis();
+    private long retainTime = 0;
 
-    public BatchRecordBuffer() {}
-
-    public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) {
-        super();
-        this.lineDelimiter = lineDelimiter;
-        this.buffer = ByteBuffer.allocate(bufferSize);
+    public BatchRecordBuffer() {
+        this.buffer = new LinkedList<>();
     }
 
-    public BatchRecordBuffer(String database, String table, byte[] 
lineDelimiter, int bufferSize) {
+    public BatchRecordBuffer(String database, String table, byte[] 
lineDelimiter, long retainTime) {
         super();
         this.database = database;
         this.table = table;
         this.lineDelimiter = lineDelimiter;
-        this.buffer = ByteBuffer.allocate(bufferSize);
+        this.buffer = new LinkedList<>();
+        this.retainTime = retainTime;
     }
 
-    public void insert(byte[] record) {
-        ensureCapacity(record.length);
+    public int insert(byte[] record) {
+        int recordSize = record.length;
         if (loadBatchFirstRecord) {
             loadBatchFirstRecord = false;
         } else if (lineDelimiter != null) {
-            this.buffer.put(this.lineDelimiter);
+            this.buffer.add(this.lineDelimiter);
+            setBufferSizeBytes(this.bufferSizeBytes + 
this.lineDelimiter.length);
+            recordSize += this.lineDelimiter.length;
         }
-        this.buffer.put(record);
-        setNumOfRecords(getNumOfRecords() + 1);
-        setBufferSizeBytes(getBufferSizeBytes() + record.length);
-    }
-
-    @VisibleForTesting
-    public void ensureCapacity(int length) {
-        int lineDelimiterSize = this.lineDelimiter == null ? 0 : 
this.lineDelimiter.length;
-        if (buffer.remaining() - lineDelimiterSize >= length) {
-            return;
-        }
-        int currentRemain = buffer.remaining();
-        int currentCapacity = buffer.capacity();
-        // add lineDelimiter length
-        int needed = length - buffer.remaining() + lineDelimiterSize;
-        // grow at least 1MB
-        long grow = Math.max(needed, 1024 * 1024);
-        // grow at least 50% of the current size
-        grow = Math.max(buffer.capacity() / 2, grow);
-        int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.capacity() 
+ grow);
-        ByteBuffer tmp = ByteBuffer.allocate(newCapacity);
-        buffer.flip();
-        tmp.put(buffer);
-        buffer.clear();
-        buffer = tmp;
-        LOG.info(
-                "record length {},buffer remain {} ,grow capacity {} to {}",
-                length,
-                currentRemain,
-                currentCapacity,
-                newCapacity);
+        this.buffer.add(record);
+        setNumOfRecords(this.numOfRecords + 1);
+        setBufferSizeBytes(this.bufferSizeBytes + record.length);
+        return recordSize;
     }
 
     public String getLabelName() {
@@ -106,13 +78,6 @@ public class BatchRecordBuffer {
         return numOfRecords == 0;
     }
 
-    public ByteBuffer getData() {
-        // change mode
-        buffer.flip();
-        LOG.debug("flush buffer: {} records, {} bytes", getNumOfRecords(), 
getBufferSizeBytes());
-        return buffer;
-    }
-
     public void clear() {
         this.buffer.clear();
         this.numOfRecords = 0;
@@ -121,7 +86,7 @@ public class BatchRecordBuffer {
         this.loadBatchFirstRecord = true;
     }
 
-    public ByteBuffer getBuffer() {
+    public LinkedList<byte[]> getBuffer() {
         return buffer;
     }
 
@@ -131,7 +96,7 @@ public class BatchRecordBuffer {
     }
 
     /** @return Buffer size in bytes */
-    public int getBufferSizeBytes() {
+    public long getBufferSizeBytes() {
         return bufferSizeBytes;
     }
 
@@ -141,7 +106,7 @@ public class BatchRecordBuffer {
     }
 
     /** @param bufferSizeBytes Updates sum of size of records present in this 
buffer (Bytes) */
-    public void setBufferSizeBytes(int bufferSizeBytes) {
+    public void setBufferSizeBytes(long bufferSizeBytes) {
         this.bufferSizeBytes = bufferSizeBytes;
     }
 
@@ -160,4 +125,22 @@ public class BatchRecordBuffer {
     public void setTable(String table) {
         this.table = table;
     }
+
+    public String getTableIdentifier() {
+        if (database != null && table != null) {
+            return database + "." + table;
+        }
+        return null;
+    }
+
+    public byte[] getLineDelimiter() {
+        return lineDelimiter;
+    }
+
+    public boolean shouldFlush() {
+        // When the buffer create time is later than the first interval 
trigger,
+        // the write will not be triggered in the next interval,
+        // so multiply it by 1.5 to trigger it as early as possible.
+        return (System.currentTimeMillis() - createTime) * 1.5 > retainTime;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 2dd7a50e..3240dafe 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -36,7 +36,6 @@ import org.apache.doris.flink.sink.HttpUtil;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.http.client.entity.GzipCompressingEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
@@ -45,7 +44,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -61,7 +59,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
 import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
@@ -82,6 +84,8 @@ public class DorisBatchStreamLoad implements Serializable {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final List<String> DORIS_SUCCESS_STATUS =
             new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
+    private static final long STREAM_LOAD_MAX_BYTES = 10 * 1024 * 1024 * 
1024L; // 10 GB
+    private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE;
     private final LabelGenerator labelGenerator;
     private final byte[] lineDelimiter;
     private static final String LOAD_URL_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
@@ -103,6 +107,10 @@ public class DorisBatchStreamLoad implements Serializable {
     private boolean enableGroupCommit;
     private boolean enableGzCompress;
     private int subTaskId;
+    private long maxBlockedBytes;
+    private final AtomicLong currentCacheBytes = new AtomicLong(0L);
+    private final Lock lock = new ReentrantLock();
+    private final Condition block = lock.newCondition();
 
     public DorisBatchStreamLoad(
             DorisOptions dorisOptions,
@@ -137,6 +145,10 @@ public class DorisBatchStreamLoad implements Serializable {
         this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, 
"").equals(COMPRESS_TYPE_GZ);
         this.executionOptions = executionOptions;
         this.flushQueue = new 
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
+        // maxBlockedBytes ensures that a buffer can be written even if the 
queue is full
+        this.maxBlockedBytes =
+                (long) executionOptions.getBufferFlushMaxBytes()
+                        * (executionOptions.getFlushQueueSize() + 1);
         if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
             String[] tableInfo = 
dorisOptions.getTableIdentifier().split("\\.");
             Preconditions.checkState(
@@ -144,7 +156,7 @@ public class DorisBatchStreamLoad implements Serializable {
                     "tableIdentifier input error, the format is 
database.table");
             this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, 
tableInfo[0], tableInfo[1]);
         }
-        this.loadAsyncExecutor = new LoadAsyncExecutor();
+        this.loadAsyncExecutor = new 
LoadAsyncExecutor(executionOptions.getFlushQueueSize());
         this.loadExecutorService =
                 new ThreadPoolExecutor(
                         1,
@@ -165,10 +177,10 @@ public class DorisBatchStreamLoad implements Serializable 
{
      * @param record
      * @throws IOException
      */
-    public synchronized void writeRecord(String database, String table, byte[] 
record)
-            throws InterruptedException {
+    public synchronized void writeRecord(String database, String table, byte[] 
record) {
         checkFlushException();
         String bufferKey = getTableIdentifier(database, table);
+
         BatchRecordBuffer buffer =
                 bufferMap.computeIfAbsent(
                         bufferKey,
@@ -177,30 +189,91 @@ public class DorisBatchStreamLoad implements Serializable 
{
                                         database,
                                         table,
                                         this.lineDelimiter,
-                                        
executionOptions.getBufferFlushMaxBytes()));
-        buffer.insert(record);
-        // When it exceeds 80% of the byteSize,to flush, to avoid triggering 
bytebuffer expansion
-        if (buffer.getBufferSizeBytes() >= 
executionOptions.getBufferFlushMaxBytes() * 0.8
-                || (executionOptions.getBufferFlushMaxRows() != 0
-                        && buffer.getNumOfRecords() >= 
executionOptions.getBufferFlushMaxRows())) {
-            flush(bufferKey, false);
+                                        
executionOptions.getBufferFlushIntervalMs()));
+
+        int bytes = buffer.insert(record);
+        currentCacheBytes.addAndGet(bytes);
+        if (currentCacheBytes.get() > maxBlockedBytes) {
+            lock.lock();
+            try {
+                while (currentCacheBytes.get() >= maxBlockedBytes) {
+                    LOG.info(
+                            "Cache full, waiting for flush, currentBytes: {}, 
maxBlockedBytes: {}",
+                            currentCacheBytes.get(),
+                            maxBlockedBytes);
+                    block.await(1, TimeUnit.SECONDS);
+                }
+            } catch (InterruptedException e) {
+                this.exception.set(e);
+                throw new RuntimeException(e);
+            } finally {
+                lock.unlock();
+            }
         }
+
+        // queue has space, flush according to the bufferMaxRows/bufferMaxBytes
+        if (flushQueue.size() < executionOptions.getFlushQueueSize()
+                && (buffer.getBufferSizeBytes() >= 
executionOptions.getBufferFlushMaxBytes()
+                        || buffer.getNumOfRecords() >= 
executionOptions.getBufferFlushMaxRows())) {
+            boolean flush = bufferFullFlush(bufferKey);
+            LOG.info("trigger flush by buffer full, flush: {}", flush);
+
+        } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
+                || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
+            // The buffer capacity exceeds the stream load limit, flush
+            boolean flush = bufferFullFlush(bufferKey);
+            LOG.info("trigger flush by buffer exceeding the limit, flush: {}", 
flush);
+        }
+    }
+
+    public synchronized boolean bufferFullFlush(String bufferKey) {
+        return doFlush(bufferKey, false, true);
+    }
+
+    public synchronized boolean intervalFlush() {
+        return doFlush(null, false, false);
     }
 
-    public synchronized void flush(String bufferKey, boolean waitUtilDone)
-            throws InterruptedException {
+    public synchronized boolean checkpointFlush() {
+        return doFlush(null, true, false);
+    }
+
+    private synchronized boolean doFlush(
+            String bufferKey, boolean waitUtilDone, boolean bufferFull) {
         checkFlushException();
+        if (waitUtilDone || bufferFull) {
+            boolean flush = flush(bufferKey, waitUtilDone);
+            return flush;
+        } else if (flushQueue.size() < executionOptions.getFlushQueueSize()) {
+            boolean flush = flush(bufferKey, false);
+            return flush;
+        }
+        return false;
+    }
+
+    private synchronized boolean flush(String bufferKey, boolean waitUtilDone) 
{
         if (null == bufferKey) {
+            boolean flush = false;
             for (String key : bufferMap.keySet()) {
-                flushBuffer(key);
+                BatchRecordBuffer buffer = bufferMap.get(key);
+                if (waitUtilDone || buffer.shouldFlush()) {
+                    // Ensure that the interval satisfies intervalMS
+                    flushBuffer(key);
+                    flush = true;
+                }
+            }
+            if (!waitUtilDone && !flush) {
+                return false;
             }
         } else if (bufferMap.containsKey(bufferKey)) {
             flushBuffer(bufferKey);
+        } else {
+            throw new DorisBatchLoadException("buffer not found for key: " + 
bufferKey);
         }
-
         if (waitUtilDone) {
             waitAsyncLoadFinish();
         }
+        return true;
     }
 
     private synchronized void flushBuffer(String bufferKey) {
@@ -247,20 +320,96 @@ public class DorisBatchStreamLoad implements Serializable 
{
         this.flushQueue.clear();
     }
 
+    @VisibleForTesting
+    public boolean mergeBuffer(List<BatchRecordBuffer> recordList, 
BatchRecordBuffer buffer) {
+        boolean merge = false;
+        if (recordList.size() > 1) {
+            boolean sameTable =
+                    recordList.stream()
+                                    .map(BatchRecordBuffer::getTableIdentifier)
+                                    .distinct()
+                                    .count()
+                            == 1;
+            // Buffers can be merged only if they belong to the same table.
+            if (sameTable) {
+                for (BatchRecordBuffer recordBuffer : recordList) {
+                    if (recordBuffer != null
+                            && recordBuffer.getLabelName() != null
+                            && 
!buffer.getLabelName().equals(recordBuffer.getLabelName())
+                            && !recordBuffer.getBuffer().isEmpty()) {
+                        merge(buffer, recordBuffer);
+                        merge = true;
+                    }
+                }
+                LOG.info(
+                        "merge {} buffer to one stream load, result 
bufferBytes {}",
+                        recordList.size(),
+                        buffer.getBufferSizeBytes());
+            }
+        }
+        return merge;
+    }
+
+    private boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer 
buffer) {
+        if (buffer.getBuffer().isEmpty()) {
+            return false;
+        }
+        if (!mergeBuffer.getBuffer().isEmpty()) {
+            mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter());
+            mergeBuffer.setBufferSizeBytes(
+                    mergeBuffer.getBufferSizeBytes() + 
mergeBuffer.getLineDelimiter().length);
+            currentCacheBytes.addAndGet(buffer.getLineDelimiter().length);
+        }
+        mergeBuffer.getBuffer().addAll(buffer.getBuffer());
+        mergeBuffer.setNumOfRecords(mergeBuffer.getNumOfRecords() + 
buffer.getNumOfRecords());
+        mergeBuffer.setBufferSizeBytes(
+                mergeBuffer.getBufferSizeBytes() + 
buffer.getBufferSizeBytes());
+        return true;
+    }
+
     class LoadAsyncExecutor implements Runnable {
+
+        private int flushQueueSize;
+
+        public LoadAsyncExecutor(int flushQueueSize) {
+            this.flushQueueSize = flushQueueSize;
+        }
+
         @Override
         public void run() {
             LOG.info("LoadAsyncExecutor start");
             loadThreadAlive = true;
+            List<BatchRecordBuffer> recordList = new 
ArrayList<>(flushQueueSize);
             while (started.get()) {
-                BatchRecordBuffer buffer = null;
+                recordList.clear();
                 try {
-                    buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
-                    if (buffer == null) {
+                    BatchRecordBuffer buffer = flushQueue.poll(2000L, 
TimeUnit.MILLISECONDS);
+                    if (buffer == null || buffer.getLabelName() == null) {
+                        // label is empty and does not need to load. It is the 
flag of waitUtilDone
                         continue;
                     }
-                    if (buffer.getLabelName() != null) {
-                        load(buffer.getLabelName(), buffer);
+                    recordList.add(buffer);
+                    boolean merge = false;
+                    if (!flushQueue.isEmpty()) {
+                        flushQueue.drainTo(recordList, flushQueueSize - 1);
+                        if (mergeBuffer(recordList, buffer)) {
+                            load(buffer.getLabelName(), buffer);
+                            merge = true;
+                        }
+                    }
+
+                    if (!merge) {
+                        for (BatchRecordBuffer bf : recordList) {
+                            if (bf == null || bf.getLabelName() == null) {
+                                continue;
+                            }
+                            load(bf.getLabelName(), bf);
+                        }
+                    }
+
+                    if (flushQueue.size() < flushQueueSize) {
+                        // Avoid waiting for 2 rounds of intervalMs
+                        doFlush(null, false, false);
                     }
                 } catch (Exception e) {
                     LOG.error("worker running error", e);
@@ -280,9 +429,8 @@ public class DorisBatchStreamLoad implements Serializable {
                 label = null;
             }
             refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
-            ByteBuffer data = buffer.getData();
-            ByteArrayEntity entity =
-                    new ByteArrayEntity(data.array(), data.arrayOffset(), 
data.limit());
+
+            BatchBufferHttpEntity entity = new BatchBufferHttpEntity(buffer);
             HttpPutBuilder putBuilder = new HttpPutBuilder();
             putBuilder
                     .setUrl(loadUrl)
@@ -321,6 +469,18 @@ public class DorisBatchStreamLoad implements Serializable {
                                                 respContent.getErrorURL());
                                 throw new DorisBatchLoadException(errMsg);
                             } else {
+                                long cacheByteBeforeFlush =
+                                        
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
+                                LOG.info(
+                                        "load success, cacheBeforeFlushBytes: 
{}, currentCacheBytes : {}",
+                                        cacheByteBeforeFlush,
+                                        currentCacheBytes.get());
+                                lock.lock();
+                                try {
+                                    block.signal();
+                                } finally {
+                                    lock.unlock();
+                                }
                                 return;
                             }
                         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 6fbde55d..db486bcb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -107,9 +107,9 @@ public class DorisBatchWriter<IN>
 
     private void intervalFlush() {
         try {
-            LOG.info("interval flush triggered.");
-            batchStreamLoad.flush(null, false);
-        } catch (InterruptedException e) {
+            boolean flush = batchStreamLoad.intervalFlush();
+            LOG.debug("interval flush trigger, flush: {}", flush);
+        } catch (Exception e) {
             flushException = e;
         }
     }
@@ -125,7 +125,7 @@ public class DorisBatchWriter<IN>
         checkFlushException();
         writeOneDorisRecord(serializer.flush());
         LOG.info("checkpoint flush triggered.");
-        batchStreamLoad.flush(null, true);
+        batchStreamLoad.checkpointFlush();
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java
similarity index 99%
copy from 
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
copy to 
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java
index df40e7a9..e5f4c4eb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.sink.batch;
+package org.apache.doris.flink.sink.copy;
 
 import org.apache.flink.annotation.VisibleForTesting;
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
index be8adcb0..2c5ed5c2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
@@ -28,7 +28,6 @@ import 
org.apache.doris.flink.exception.DorisBatchLoadException;
 import org.apache.doris.flink.sink.EscapeHandler;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.HttpUtil;
-import org.apache.doris.flink.sink.batch.BatchRecordBuffer;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 02e59084..2a1c9b1a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -275,14 +275,14 @@ public class DorisConfigOptions {
     public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
             ConfigOptions.key("sink.buffer-flush.max-rows")
                     .intType()
-                    .defaultValue(50000)
+                    .defaultValue(500000)
                     .withDescription(
                             "The maximum number of flush items in each batch, 
the default is 5w");
 
     public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_BYTES =
             ConfigOptions.key("sink.buffer-flush.max-bytes")
                     .memoryType()
-                    .defaultValue(MemorySize.parse("10mb"))
+                    .defaultValue(MemorySize.parse("100mb"))
                     .withDescription(
                             "The maximum number of bytes flushed in each 
batch, the default is 10MB");
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
index 9cc19716..bc19c572 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
@@ -52,9 +52,9 @@ public class DorisExecutionOptionsTest {
                         .setWriteMode(WriteMode.STREAM_LOAD)
                         .setLabelPrefix("doris")
                         .enable2PC()
-                        .setBufferFlushMaxBytes(10)
+                        .setBufferFlushMaxBytes(10485760)
                         .setBufferFlushIntervalMs(10000)
-                        .setBufferFlushMaxRows(12)
+                        .setBufferFlushMaxRows(10000)
                         .setCheckInterval(10)
                         .setIgnoreCommitError(true)
                         .setDeletable(true)
@@ -72,9 +72,9 @@ public class DorisExecutionOptionsTest {
                         .setWriteMode(WriteMode.STREAM_LOAD)
                         .setLabelPrefix("doris")
                         .enable2PC()
-                        .setBufferFlushMaxBytes(10)
+                        .setBufferFlushMaxBytes(10485760)
                         .setBufferFlushIntervalMs(10000)
-                        .setBufferFlushMaxRows(12)
+                        .setBufferFlushMaxRows(10000)
                         .setCheckInterval(10)
                         .setIgnoreCommitError(true)
                         .setDeletable(true)
@@ -111,17 +111,17 @@ public class DorisExecutionOptionsTest {
         Assert.assertNotEquals(exceptOptions, builder.build());
         builder.enable2PC();
 
-        builder.setBufferFlushMaxBytes(11);
+        builder.setBufferFlushMaxBytes(104857601);
         Assert.assertNotEquals(exceptOptions, builder.build());
-        builder.setBufferFlushMaxBytes(10);
+        builder.setBufferFlushMaxBytes(10485760);
 
         builder.setBufferFlushIntervalMs(100001);
         Assert.assertNotEquals(exceptOptions, builder.build());
         builder.setBufferFlushIntervalMs(10000);
 
-        builder.setBufferFlushMaxRows(2);
+        builder.setBufferFlushMaxRows(10000);
         Assert.assertNotEquals(exceptOptions, builder.build());
-        builder.setBufferFlushMaxRows(12);
+        builder.setBufferFlushMaxRows(10000);
 
         builder.setCheckInterval(11);
         Assert.assertNotEquals(exceptOptions, builder.build());
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index aa3d00da..de0ef041 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -208,9 +208,9 @@ public class DorisSinkITCase extends DorisTestBase {
                                 + " 'sink.enable.batch-mode' = 'true',"
                                 + " 'sink.enable-delete' = 'true',"
                                 + " 'sink.flush.queue-size' = '2',"
-                                + " 'sink.buffer-flush.max-rows' = '1',"
-                                + " 'sink.buffer-flush.max-bytes' = '5',"
-                                + " 'sink.buffer-flush.interval' = '10s'"
+                                + " 'sink.buffer-flush.max-rows' = '10000',"
+                                + " 'sink.buffer-flush.max-bytes' = '10MB',"
+                                + " 'sink.buffer-flush.interval' = '1s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_CSV_BATCH_TBL,
@@ -219,7 +219,7 @@ public class DorisSinkITCase extends DorisTestBase {
         tEnv.executeSql(sinkDDL);
         tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all 
SELECT 'flink',2");
 
-        Thread.sleep(10000);
+        Thread.sleep(20000);
         List<String> expected = Arrays.asList("doris,1", "flink,2");
         String query =
                 String.format(
@@ -248,8 +248,9 @@ public class DorisSinkITCase extends DorisTestBase {
         executionBuilder
                 .setLabelPrefix(UUID.randomUUID().toString())
                 .setStreamLoadProp(properties)
-                .setBufferFlushMaxBytes(1)
-                .setBufferFlushMaxRows(10);
+                .setBufferFlushMaxBytes(10485760)
+                .setBufferFlushMaxRows(10000)
+                .setBufferFlushIntervalMs(1000);
 
         builder.setDorisExecutionOptions(executionBuilder.build())
                 .setSerializer(new SimpleStringSerializer())
@@ -258,7 +259,7 @@ public class DorisSinkITCase extends DorisTestBase {
         env.fromElements("doris,1", "flink,2").sinkTo(builder.build());
         env.execute();
 
-        Thread.sleep(10000);
+        Thread.sleep(20000);
         List<String> expected = Arrays.asList("doris,1", "flink,2");
         String query =
                 String.format(
@@ -295,9 +296,9 @@ public class DorisSinkITCase extends DorisTestBase {
                                 + " 'sink.enable.batch-mode' = 'true',"
                                 + " 'sink.enable-delete' = 'true',"
                                 + " 'sink.flush.queue-size' = '2',"
-                                + " 'sink.buffer-flush.max-rows' = '3',"
-                                + " 'sink.buffer-flush.max-bytes' = '5000',"
-                                + " 'sink.buffer-flush.interval' = '10s'"
+                                + " 'sink.buffer-flush.max-rows' = '10000',"
+                                + " 'sink.buffer-flush.max-bytes' = '10MB',"
+                                + " 'sink.buffer-flush.interval' = '1s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_GROUP_COMMIT,
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java
new file mode 100644
index 00000000..fe20c544
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestBatchBufferHttpEntity {
+
+    @Test
+    public void testWrite() throws Exception {
+        BatchRecordBuffer recordBuffer = TestBatchBufferStream.mockBuffer();
+        byte[] expectedData = 
TestBatchBufferStream.mergeByteArrays(recordBuffer.getBuffer());
+        Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000);
+
+        BatchBufferHttpEntity entity = new BatchBufferHttpEntity(recordBuffer);
+        assertTrue(entity.isRepeatable());
+        assertFalse(entity.isStreaming());
+        assertEquals(entity.getContentLength(), expectedData.length);
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        entity.writeTo(outputStream);
+        assertArrayEquals(expectedData, outputStream.toByteArray());
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java
new file mode 100644
index 00000000..3dad5b60
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBatchBufferStream {
+
+    @Test
+    public void testRead() throws Exception {
+        BatchRecordBuffer recordBuffer = mockBuffer();
+        byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer());
+        Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000);
+
+        byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()];
+        try (BatchBufferStream inputStream = new 
BatchBufferStream(recordBuffer.getBuffer())) {
+            int len = inputStream.read(actualData, 0, actualData.length);
+            assertEquals(actualData.length, len);
+            assertArrayEquals(expectedData, actualData);
+        }
+    }
+
+    @Test
+    public void testReadBufLen() throws Exception {
+        BatchRecordBuffer recordBuffer = mockBuffer();
+        byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer());
+        Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000);
+
+        byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()];
+        try (BatchBufferStream inputStream = new 
BatchBufferStream(recordBuffer.getBuffer())) {
+            int pos = 0;
+            while (pos < actualData.length) {
+                // mock random length
+                int maxLen = new Random().nextInt(actualData.length - pos) + 1;
+                int len = inputStream.read(actualData, pos, maxLen);
+                if (len == -1) {
+                    break;
+                }
+                assertTrue(len > 0 && len <= maxLen);
+                pos += len;
+            }
+            assertEquals(actualData.length, pos);
+            assertArrayEquals(expectedData, actualData);
+        }
+    }
+
+    public static BatchRecordBuffer mockBuffer() {
+        BatchRecordBuffer recordBuffer = new BatchRecordBuffer();
+        for (int i = 0; i < 1000; i++) {
+            recordBuffer.insert((UUID.randomUUID() + "," + i).getBytes());
+        }
+        return recordBuffer;
+    }
+
+    public static byte[] mergeByteArrays(List<byte[]> listOfByteArrays) {
+        int totalLength = 0;
+        for (byte[] byteArray : listOfByteArrays) {
+            totalLength += byteArray.length;
+        }
+
+        byte[] mergedArray = new byte[totalLength];
+
+        int currentPosition = 0;
+        for (byte[] byteArray : listOfByteArrays) {
+            System.arraycopy(byteArray, 0, mergedArray, currentPosition, 
byteArray.length);
+            currentPosition += byteArray.length;
+        }
+
+        return mergedArray;
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index d52149d6..62d84c99 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -41,9 +41,13 @@ import org.mockito.MockedStatic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
@@ -84,8 +88,10 @@ public class TestDorisBatchStreamLoad {
 
     @Test
     public void testLoadFail() throws Exception {
+        LOG.info("testLoadFail start");
         DorisReadOptions readOptions = DorisReadOptions.builder().build();
-        DorisExecutionOptions executionOptions = 
DorisExecutionOptions.builder().build();
+        DorisExecutionOptions executionOptions =
+                
DorisExecutionOptions.builder().setBufferFlushIntervalMs(1000).build();
         DorisOptions options =
                 DorisOptions.builder()
                         .setFenodes("127.0.0.1:1")
@@ -104,7 +110,7 @@ public class TestDorisBatchStreamLoad {
                 () -> loader.isLoadThreadAlive(),
                 Deadline.fromNow(Duration.ofSeconds(10)),
                 100L,
-                "Condition was not met in given timeout.");
+                "testLoadFail wait loader start failed.");
         Assert.assertTrue(loader.isLoadThreadAlive());
         BackendUtil backendUtil = mock(BackendUtil.class);
         HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class);
@@ -118,17 +124,25 @@ public class TestDorisBatchStreamLoad {
         when(httpClientBuilder.build()).thenReturn(httpClient);
         when(httpClient.execute(any())).thenReturn(response);
         loader.writeRecord("db", "tbl", "1,data".getBytes());
-        loader.flush("db.tbl", true);
+        loader.checkpointFlush();
 
+        TestUtil.waitUntilCondition(
+                () -> !loader.isLoadThreadAlive(),
+                Deadline.fromNow(Duration.ofSeconds(20)),
+                100L,
+                "testLoadFail wait loader exit failed." + 
loader.isLoadThreadAlive());
         AtomicReference<Throwable> exception = loader.getException();
         Assert.assertTrue(exception.get() instanceof Exception);
         Assert.assertTrue(exception.get().getMessage().contains("stream load 
error"));
+        LOG.info("testLoadFail end");
     }
 
     @Test
     public void testLoadError() throws Exception {
+        LOG.info("testLoadError start");
         DorisReadOptions readOptions = DorisReadOptions.builder().build();
-        DorisExecutionOptions executionOptions = 
DorisExecutionOptions.builder().build();
+        DorisExecutionOptions executionOptions =
+                
DorisExecutionOptions.builder().setBufferFlushIntervalMs(1000).build();
         DorisOptions options =
                 DorisOptions.builder()
                         .setFenodes("127.0.0.1:1")
@@ -148,7 +162,7 @@ public class TestDorisBatchStreamLoad {
                 () -> loader.isLoadThreadAlive(),
                 Deadline.fromNow(Duration.ofSeconds(10)),
                 100L,
-                "Condition was not met in given timeout.");
+                "testLoadError wait loader start failed.");
         Assert.assertTrue(loader.isLoadThreadAlive());
         BackendUtil backendUtil = mock(BackendUtil.class);
         HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class);
@@ -161,12 +175,17 @@ public class TestDorisBatchStreamLoad {
         when(httpClientBuilder.build()).thenReturn(httpClient);
         when(httpClient.execute(any())).thenReturn(response);
         loader.writeRecord("db", "tbl", "1,data".getBytes());
-        loader.flush("db.tbl", true);
+        loader.checkpointFlush();
 
+        TestUtil.waitUntilCondition(
+                () -> !loader.isLoadThreadAlive(),
+                Deadline.fromNow(Duration.ofSeconds(20)),
+                100L,
+                "testLoadError wait loader exit failed." + 
loader.isLoadThreadAlive());
         AtomicReference<Throwable> exception = loader.getException();
-
         Assert.assertTrue(exception.get() instanceof Exception);
         Assert.assertTrue(exception.get().getMessage().contains("stream load 
error"));
+        LOG.info("testLoadError end");
     }
 
     @After
@@ -175,4 +194,51 @@ public class TestDorisBatchStreamLoad {
             backendUtilMockedStatic.close();
         }
     }
+
+    @Test
+    public void mergeBufferTest() {
+        DorisReadOptions readOptions = DorisReadOptions.builder().build();
+        DorisExecutionOptions executionOptions = 
DorisExecutionOptions.builder().build();
+        DorisOptions options =
+                DorisOptions.builder()
+                        .setFenodes("127.0.0.1:8030")
+                        .setBenodes("127.0.0.1:9030")
+                        .setTableIdentifier("db.tbl")
+                        .build();
+
+        DorisBatchStreamLoad loader =
+                new DorisBatchStreamLoad(
+                        options, readOptions, executionOptions, new 
LabelGenerator("xx", false), 0);
+
+        List<BatchRecordBuffer> bufferList = new ArrayList<>();
+        BatchRecordBuffer recordBuffer =
+                new BatchRecordBuffer("db", "tbl", 
"\n".getBytes(StandardCharsets.UTF_8), 0);
+        recordBuffer.insert("doris,2".getBytes(StandardCharsets.UTF_8));
+        recordBuffer.setLabelName("label2");
+        BatchRecordBuffer buffer =
+                new BatchRecordBuffer("db", "tbl", 
"\n".getBytes(StandardCharsets.UTF_8), 0);
+        buffer.insert("doris,1".getBytes(StandardCharsets.UTF_8));
+        buffer.setLabelName("label1");
+
+        boolean flag = loader.mergeBuffer(bufferList, buffer);
+        Assert.assertEquals(false, flag);
+
+        bufferList.add(buffer);
+        bufferList.add(recordBuffer);
+        flag = loader.mergeBuffer(bufferList, buffer);
+        Assert.assertEquals(true, flag);
+        byte[] bytes = mergeByteArrays(buffer.getBuffer());
+        Assert.assertArrayEquals(bytes, 
"doris,1\ndoris,2".getBytes(StandardCharsets.UTF_8));
+
+        // multi table
+        bufferList.clear();
+        bufferList.add(buffer);
+        BatchRecordBuffer recordBuffer2 =
+                new BatchRecordBuffer("db", "tbl2", 
"\n".getBytes(StandardCharsets.UTF_8), 0);
+        recordBuffer2.insert("doris,3".getBytes(StandardCharsets.UTF_8));
+        recordBuffer2.setLabelName("label3");
+        bufferList.add(recordBuffer2);
+        flag = loader.mergeBuffer(bufferList, buffer);
+        Assert.assertEquals(false, flag);
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java
similarity index 99%
rename from 
flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
rename to 
flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java
index 1a6897c9..3107225a 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.sink.batch;
+package org.apache.doris.flink.sink.copy;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 05a93dc5..56887d93 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -135,7 +135,7 @@ public class DorisDynamicTableFactoryTest {
         properties.put("sink.use-cache", "true");
         properties.put("sink.enable.batch-mode", "true");
         properties.put("sink.flush.queue-size", "2");
-        properties.put("sink.buffer-flush.max-rows", "1000");
+        properties.put("sink.buffer-flush.max-rows", "10000");
         properties.put("sink.buffer-flush.max-bytes", "10MB");
         properties.put("sink.buffer-flush.interval", "10s");
         properties.put("sink.ignore.update-before", "true");
@@ -169,7 +169,7 @@ public class DorisDynamicTableFactoryTest {
                         .enable2PC()
                         .setBufferFlushIntervalMs(10000)
                         .setBufferFlushMaxBytes(10 * 1024 * 1024)
-                        .setBufferFlushMaxRows(1000)
+                        .setBufferFlushMaxRows(10000)
                         .setFlushQueueSize(2)
                         .setUseCache(true)
                         .setIgnoreCommitError(false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to