This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 182c274  [Improvement] add option ignore commit error (#327)
182c274 is described below

commit 182c274710cbfed6686afc22de273adba2e846df
Author: wudi <676366...@qq.com>
AuthorDate: Mon Mar 4 17:15:26 2024 +0800

    [Improvement] add option ignore commit error (#327)
---
 .../doris/flink/cfg/DorisExecutionOptions.java     | 18 ++++++++--
 .../org/apache/doris/flink/sink/DorisSink.java     |  3 +-
 .../doris/flink/sink/committer/DorisCommitter.java | 41 ++++++++++++++++------
 .../doris/flink/table/DorisConfigOptions.java      |  7 ++++
 .../flink/table/DorisDynamicTableFactory.java      |  3 ++
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  3 ++
 .../flink/sink/committer/TestDorisCommitter.java   |  5 ++-
 7 files changed, 65 insertions(+), 15 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index a890d34..b1b49d1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -63,6 +63,7 @@ public class DorisExecutionOptions implements Serializable {
     private final boolean enableBatchMode;
     private final boolean ignoreUpdateBefore;
     private final WriteMode writeMode;
+    private final boolean ignoreCommitError;
 
     public DorisExecutionOptions(
             int checkInterval,
@@ -81,7 +82,8 @@ public class DorisExecutionOptions implements Serializable {
             long bufferFlushIntervalMs,
             boolean ignoreUpdateBefore,
             boolean force2PC,
-            WriteMode writeMode) {
+            WriteMode writeMode,
+            boolean ignoreCommitError) {
         Preconditions.checkArgument(maxRetries >= 0);
         this.checkInterval = checkInterval;
         this.maxRetries = maxRetries;
@@ -102,6 +104,7 @@ public class DorisExecutionOptions implements Serializable {
 
         this.ignoreUpdateBefore = ignoreUpdateBefore;
         this.writeMode = writeMode;
+        this.ignoreCommitError = ignoreCommitError;
     }
 
     public static Builder builder() {
@@ -205,6 +208,10 @@ public class DorisExecutionOptions implements Serializable 
{
         return writeMode;
     }
 
+    public boolean ignoreCommitError() {
+        return ignoreCommitError;
+    }
+
     /** Builder of {@link DorisExecutionOptions}. */
     public static class Builder {
         private int checkInterval = DEFAULT_CHECK_INTERVAL;
@@ -229,6 +236,7 @@ public class DorisExecutionOptions implements Serializable {
 
         private boolean ignoreUpdateBefore = true;
         private WriteMode writeMode = WriteMode.STREAM_LOAD;
+        private boolean ignoreCommitError = false;
 
         public Builder setCheckInterval(Integer checkInterval) {
             this.checkInterval = checkInterval;
@@ -320,6 +328,11 @@ public class DorisExecutionOptions implements Serializable 
{
             return this;
         }
 
+        public Builder setIgnoreCommitError(boolean ignoreCommitError) {
+            this.ignoreCommitError = ignoreCommitError;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
             // If format=json is set but read_json_by_line is not set, record 
may not be written.
             if (streamLoadProp != null
@@ -344,7 +357,8 @@ public class DorisExecutionOptions implements Serializable {
                     bufferFlushIntervalMs,
                     ignoreUpdateBefore,
                     force2PC,
-                    writeMode);
+                    writeMode,
+                    ignoreCommitError);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index e9ba2eb..2f00c9c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -89,8 +89,7 @@ public class DorisSink<IN>
     public Committer createCommitter() throws IOException {
         if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode())
                 || 
WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) {
-            return new DorisCommitter(
-                    dorisOptions, dorisReadOptions, 
dorisExecutionOptions.getMaxRetries());
+            return new DorisCommitter(dorisOptions, dorisReadOptions, 
dorisExecutionOptions);
         } else if 
(WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) {
             return new DorisCopyCommitter(dorisOptions, 
dorisExecutionOptions.getMaxRetries());
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 3959c26..1fbab85 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -18,10 +18,12 @@
 package org.apache.doris.flink.sink.committer;
 
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
@@ -58,20 +60,25 @@ public class DorisCommitter implements 
Committer<DorisCommittable>, Closeable {
     private final BackendUtil backendUtil;
 
     int maxRetry;
+    final boolean ignoreCommitError;
 
     public DorisCommitter(
-            DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, int 
maxRetry) {
-        this(dorisOptions, dorisReadOptions, maxRetry, new 
HttpUtil().getHttpClient());
+            DorisOptions dorisOptions,
+            DorisReadOptions dorisReadOptions,
+            DorisExecutionOptions executionOptions) {
+        this(dorisOptions, dorisReadOptions, executionOptions, new 
HttpUtil().getHttpClient());
     }
 
     public DorisCommitter(
             DorisOptions dorisOptions,
             DorisReadOptions dorisReadOptions,
-            int maxRetry,
+            DorisExecutionOptions executionOptions,
             CloseableHttpClient client) {
         this.dorisOptions = dorisOptions;
         this.dorisReadOptions = dorisReadOptions;
-        this.maxRetry = maxRetry;
+        Preconditions.checkArgument(maxRetry >= 0);
+        this.maxRetry = executionOptions.getMaxRetries();
+        this.ignoreCommitError = executionOptions.ignoreCommitError();
         this.httpClient = client;
         this.backendUtil =
                 StringUtils.isNotEmpty(dorisOptions.getBenodes())
@@ -99,8 +106,8 @@ public class DorisCommitter implements 
Committer<DorisCommittable>, Closeable {
 
         // hostPort
         String hostPort = committable.getHostPort();
-
         LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
+        Throwable ex = new Throwable();
         int retry = 0;
         while (retry <= maxRetry) {
             // get latest-url
@@ -130,17 +137,31 @@ public class DorisCommitter implements 
Committer<DorisCommittable>, Closeable {
                 String reasonPhrase = statusLine.getReasonPhrase();
                 LOG.error("commit failed with {}, reason {}", hostPort, 
reasonPhrase);
                 if (retry == maxRetry) {
-                    throw new DorisRuntimeException("commit transaction error: 
" + reasonPhrase);
+                    ex = new DorisRuntimeException("commit transaction error: 
" + reasonPhrase);
                 }
                 hostPort = backendUtil.getAvailableBackend();
             } catch (Exception e) {
                 LOG.error("commit transaction failed, to retry, {}", 
e.getMessage());
-                if (retry == maxRetry) {
-                    throw new DorisRuntimeException("commit transaction error, 
", e);
-                }
+                ex = e;
                 hostPort = backendUtil.getAvailableBackend();
             }
-            retry++;
+
+            if (retry++ >= maxRetry) {
+                if (ignoreCommitError) {
+                    // Generally used when txn(stored in checkpoint) expires 
and unexpected
+                    // errors occur in commit.
+
+                    // It should be noted that you must manually ensure that 
the txn has been
+                    // successfully submitted to doris, otherwise there may be 
a risk of data
+                    // loss.
+                    LOG.error(
+                            "Unable to commit transaction {} and data has been 
potentially lost ",
+                            committable,
+                            ex);
+                } else {
+                    throw new DorisRuntimeException("commit transaction error, 
", ex);
+                }
+            }
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 2c7c753..c698871 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -235,6 +235,13 @@ public class DorisConfigOptions {
                     .defaultValue(WriteMode.STREAM_LOAD.name())
                     .withDescription("Write mode, supports stream_load, 
stream_load_batch");
 
+    public static final ConfigOption<Boolean> SINK_IGNORE_COMMIT_ERROR =
+            ConfigOptions.key("sink.ignore.commit-error")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to ignore commit errors. Usually used 
when the checkpoint cannot be restored to skip the commit of txn. The default 
is false.");
+
     public static final ConfigOption<Integer> SINK_PARALLELISM = 
FactoryUtil.SINK_PARALLELISM;
 
     public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE =
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index c7d13f6..bf5cd8c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -73,6 +73,7 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
@@ -156,6 +157,7 @@ public final class DorisDynamicTableFactory
 
         options.add(SOURCE_USE_OLD_API);
         options.add(SINK_WRITE_MODE);
+        options.add(SINK_IGNORE_COMMIT_ERROR);
         return options;
     }
 
@@ -226,6 +228,7 @@ public final class DorisDynamicTableFactory
         builder.setStreamLoadProp(streamLoadProp);
         builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
         
builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE));
+        
builder.setIgnoreCommitError(readableConfig.get(SINK_IGNORE_COMMIT_ERROR));
 
         if (!readableConfig.get(SINK_ENABLE_2PC)) {
             builder.disable2PC();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index cd5ef1a..d713355 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -290,6 +290,9 @@ public abstract class DatabaseSync {
         sinkConfig
                 .getOptional(DorisConfigOptions.SINK_WRITE_MODE)
                 .ifPresent(v -> 
executionBuilder.setWriteMode(WriteMode.of(v)));
+        sinkConfig
+                .getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR)
+                .ifPresent(executionBuilder::setIgnoreCommitError);
 
         DorisExecutionOptions executionOptions = executionBuilder.build();
         builder.setDorisReadOptions(DorisReadOptions.builder().build())
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
index f5c82bb..be5bb0d 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.committer;
 
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
@@ -59,6 +60,7 @@ public class TestDorisCommitter {
     public void setUp() throws Exception {
         DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
         DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions();
+        DorisExecutionOptions executionOptions = 
OptionUtils.buildExecutionOptional();
         dorisCommittable = new DorisCommittable("127.0.0.1:8710", "test", 0);
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         entityMock = new HttpEntityMock();
@@ -78,7 +80,8 @@ public class TestDorisCommitter {
                                 BackendV2.BackendRowV2.of("127.0.0.1", 8040, 
true)));
         backendUtilMockedStatic.when(() -> 
BackendUtil.tryHttpConnection(any())).thenReturn(true);
 
-        dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 3, 
httpClient);
+        dorisCommitter =
+                new DorisCommitter(dorisOptions, readOptions, 
executionOptions, httpClient);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to