This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new ab9412db [Fix] improve batch mode when streamload failed (#560)
ab9412db is described below

commit ab9412db00cd36f5e4372cc9c022f12e8d4b4b14
Author: wudi <676366...@qq.com>
AuthorDate: Wed Feb 19 18:01:34 2025 +0800

    [Fix] improve batch mode when streamload failed (#560)
---
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 24 ++++++++++++++-----
 .../doris/flink/sink/batch/DorisBatchWriter.java   | 11 ++++++++-
 .../flink/sink/batch/TestDorisBatchStreamLoad.java | 27 +++++-----------------
 .../flink/sink/batch/TestDorisBatchWriter.java     |  3 ++-
 4 files changed, 36 insertions(+), 29 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 267a121c..c8473120 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -184,6 +184,7 @@ public class DorisBatchStreamLoad implements Serializable {
     public void writeRecord(String database, String table, byte[] record) {
         checkFlushException();
         String bufferKey = getTableIdentifier(database, table);
+
         getLock(bufferKey).readLock().lock();
         BatchRecordBuffer buffer =
                 bufferMap.computeIfAbsent(
@@ -198,6 +199,7 @@ public class DorisBatchStreamLoad implements Serializable {
         int bytes = buffer.insert(record);
         currentCacheBytes.addAndGet(bytes);
         getLock(bufferKey).readLock().unlock();
+
         if (currentCacheBytes.get() > maxBlockedBytes) {
             lock.lock();
             try {
@@ -258,8 +260,9 @@ public class DorisBatchStreamLoad implements Serializable {
     }
 
     private synchronized boolean flush(String bufferKey, boolean waitUtilDone) 
{
-        if (bufferMap.isEmpty()) {
+        if (!waitUtilDone && bufferMap.isEmpty()) {
             // bufferMap may have been flushed by other threads
+            LOG.info("bufferMap is empty, no need to flush {}", bufferKey);
             return false;
         }
         if (null == bufferKey) {
@@ -295,6 +298,7 @@ public class DorisBatchStreamLoad implements Serializable {
             getLock(bufferKey).writeLock().unlock();
         }
         if (buffer == null) {
+            LOG.info("buffer key is not exist {}, skipped", bufferKey);
             return;
         }
         
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
@@ -312,6 +316,9 @@ public class DorisBatchStreamLoad implements Serializable {
         } catch (InterruptedException e) {
             throw new RuntimeException("Failed to put record buffer to flush 
queue");
         }
+        // When the load thread reports an error, the flushQueue will be 
cleared,
+        // and need to force a check for the exception.
+        checkFlushException();
     }
 
     private void checkFlushException() {
@@ -321,7 +328,9 @@ public class DorisBatchStreamLoad implements Serializable {
     }
 
     private void waitAsyncLoadFinish() {
-        for (int i = 0; i < executionOptions.getFlushQueueSize() + 1; i++) {
+        // Because the queue will have a drainTo operation, it needs to be 
multiplied by 2
+        for (int i = 0; i < executionOptions.getFlushQueueSize() * 2 + 1; i++) 
{
+            // eof buffer
             BatchRecordBuffer empty = new BatchRecordBuffer();
             putRecordToFlushQueue(empty);
         }
@@ -335,8 +344,6 @@ public class DorisBatchStreamLoad implements Serializable {
         // close async executor
         this.loadExecutorService.shutdown();
         this.started.set(false);
-        // clear buffer
-        this.flushQueue.clear();
     }
 
     @VisibleForTesting
@@ -407,10 +414,14 @@ public class DorisBatchStreamLoad implements Serializable 
{
                 recordList.clear();
                 try {
                     BatchRecordBuffer buffer = flushQueue.poll(2000L, 
TimeUnit.MILLISECONDS);
-                    if (buffer == null || buffer.getLabelName() == null) {
-                        // label is empty and does not need to load. It is the 
flag of waitUtilDone
+                    if (buffer == null) {
                         continue;
                     }
+                    if (buffer.getLabelName() == null) {
+                        // When the label is empty, it is the eof buffer for 
checkpoint flush.
+                        continue;
+                    }
+
                     recordList.add(buffer);
                     boolean merge = false;
                     if (!flushQueue.isEmpty()) {
@@ -424,6 +435,7 @@ public class DorisBatchStreamLoad implements Serializable {
                     if (!merge) {
                         for (BatchRecordBuffer bf : recordList) {
                             if (bf == null || bf.getLabelName() == null) {
+                                // When the label is empty, it's eof buffer 
for checkpointFlush.
                                 continue;
                             }
                             load(bf.getLabelName(), bf);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index db486bcb..b0b256ee 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -18,6 +18,7 @@
 package org.apache.doris.flink.sink.batch;
 
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -67,6 +68,12 @@ public class DorisBatchWriter<IN>
             DorisOptions dorisOptions,
             DorisReadOptions dorisReadOptions,
             DorisExecutionOptions executionOptions) {
+
+        long restoreCheckpointId =
+                initContext
+                        .getRestoredCheckpointId()
+                        .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+        LOG.info("restore from checkpointId {}", restoreCheckpointId);
         if 
(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
             String[] tableInfo = 
dorisOptions.getTableIdentifier().split("\\.");
             Preconditions.checkState(
@@ -75,6 +82,7 @@ public class DorisBatchWriter<IN>
             this.database = tableInfo[0];
             this.table = tableInfo[1];
         }
+
         LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
         this.subtaskId = initContext.getSubtaskId();
         this.labelPrefix = executionOptions.getLabelPrefix() + "_" + 
initContext.getSubtaskId();
@@ -130,12 +138,13 @@ public class DorisBatchWriter<IN>
 
     @Override
     public Collection<DorisCommittable> prepareCommit() throws IOException, 
InterruptedException {
-        // nothing to commit
+        checkFlushException();
         return Collections.emptyList();
     }
 
     @Override
     public List<DorisWriterState> snapshotState(long checkpointId) throws 
IOException {
+        checkFlushException();
         return new ArrayList<>();
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index 62d84c99..784334aa 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -45,7 +45,6 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
 import static org.mockito.ArgumentMatchers.any;
@@ -124,17 +123,10 @@ public class TestDorisBatchStreamLoad {
         when(httpClientBuilder.build()).thenReturn(httpClient);
         when(httpClient.execute(any())).thenReturn(response);
         loader.writeRecord("db", "tbl", "1,data".getBytes());
-        loader.checkpointFlush();
 
-        TestUtil.waitUntilCondition(
-                () -> !loader.isLoadThreadAlive(),
-                Deadline.fromNow(Duration.ofSeconds(20)),
-                100L,
-                "testLoadFail wait loader exit failed." + 
loader.isLoadThreadAlive());
-        AtomicReference<Throwable> exception = loader.getException();
-        Assert.assertTrue(exception.get() instanceof Exception);
-        Assert.assertTrue(exception.get().getMessage().contains("stream load 
error"));
-        LOG.info("testLoadFail end");
+        thrown.expect(Exception.class);
+        thrown.expectMessage("stream load error");
+        loader.checkpointFlush();
     }
 
     @Test
@@ -175,17 +167,10 @@ public class TestDorisBatchStreamLoad {
         when(httpClientBuilder.build()).thenReturn(httpClient);
         when(httpClient.execute(any())).thenReturn(response);
         loader.writeRecord("db", "tbl", "1,data".getBytes());
-        loader.checkpointFlush();
 
-        TestUtil.waitUntilCondition(
-                () -> !loader.isLoadThreadAlive(),
-                Deadline.fromNow(Duration.ofSeconds(20)),
-                100L,
-                "testLoadError wait loader exit failed." + 
loader.isLoadThreadAlive());
-        AtomicReference<Throwable> exception = loader.getException();
-        Assert.assertTrue(exception.get() instanceof Exception);
-        Assert.assertTrue(exception.get().getMessage().contains("stream load 
error"));
-        LOG.info("testLoadError end");
+        thrown.expect(Exception.class);
+        thrown.expectMessage("stream load error");
+        loader.checkpointFlush();
     }
 
     @After
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java
index 225dabd5..9704e250 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java
@@ -57,7 +57,8 @@ public class TestDorisBatchWriter {
                         .build();
         thrown.expect(IllegalStateException.class);
         thrown.expectMessage("tableIdentifier input error");
-        DorisBatchWriter batchWriter = new DorisBatchWriter(null, null, 
options, null, null);
+        Sink.InitContext initContext = mock(Sink.InitContext.class);
+        DorisBatchWriter batchWriter = new DorisBatchWriter(initContext, null, 
options, null, null);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to