This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 182c274 [Improvement] add option ignore commit error (#327) 182c274 is described below commit 182c274710cbfed6686afc22de273adba2e846df Author: wudi <676366...@qq.com> AuthorDate: Mon Mar 4 17:15:26 2024 +0800 [Improvement] add option ignore commit error (#327) --- .../doris/flink/cfg/DorisExecutionOptions.java | 18 ++++++++-- .../org/apache/doris/flink/sink/DorisSink.java | 3 +- .../doris/flink/sink/committer/DorisCommitter.java | 41 ++++++++++++++++------ .../doris/flink/table/DorisConfigOptions.java | 7 ++++ .../flink/table/DorisDynamicTableFactory.java | 3 ++ .../apache/doris/flink/tools/cdc/DatabaseSync.java | 3 ++ .../flink/sink/committer/TestDorisCommitter.java | 5 ++- 7 files changed, 65 insertions(+), 15 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index a890d34..b1b49d1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -63,6 +63,7 @@ public class DorisExecutionOptions implements Serializable { private final boolean enableBatchMode; private final boolean ignoreUpdateBefore; private final WriteMode writeMode; + private final boolean ignoreCommitError; public DorisExecutionOptions( int checkInterval, @@ -81,7 +82,8 @@ public class DorisExecutionOptions implements Serializable { long bufferFlushIntervalMs, boolean ignoreUpdateBefore, boolean force2PC, - WriteMode writeMode) { + WriteMode writeMode, + boolean ignoreCommitError) { Preconditions.checkArgument(maxRetries >= 0); this.checkInterval = checkInterval; this.maxRetries = maxRetries; @@ -102,6 +104,7 @@ public class DorisExecutionOptions implements Serializable { this.ignoreUpdateBefore = ignoreUpdateBefore; this.writeMode = writeMode; + this.ignoreCommitError = ignoreCommitError; } public static Builder builder() { @@ -205,6 +208,10 @@ public class DorisExecutionOptions implements Serializable { return writeMode; } + public boolean ignoreCommitError() { + return ignoreCommitError; + } + /** Builder of {@link DorisExecutionOptions}. */ public static class Builder { private int checkInterval = DEFAULT_CHECK_INTERVAL; @@ -229,6 +236,7 @@ public class DorisExecutionOptions implements Serializable { private boolean ignoreUpdateBefore = true; private WriteMode writeMode = WriteMode.STREAM_LOAD; + private boolean ignoreCommitError = false; public Builder setCheckInterval(Integer checkInterval) { this.checkInterval = checkInterval; @@ -320,6 +328,11 @@ public class DorisExecutionOptions implements Serializable { return this; } + public Builder setIgnoreCommitError(boolean ignoreCommitError) { + this.ignoreCommitError = ignoreCommitError; + return this; + } + public DorisExecutionOptions build() { // If format=json is set but read_json_by_line is not set, record may not be written. if (streamLoadProp != null @@ -344,7 +357,8 @@ public class DorisExecutionOptions implements Serializable { bufferFlushIntervalMs, ignoreUpdateBefore, force2PC, - writeMode); + writeMode, + ignoreCommitError); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index e9ba2eb..2f00c9c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -89,8 +89,7 @@ public class DorisSink<IN> public Committer createCommitter() throws IOException { if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode()) || WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) { - return new DorisCommitter( - dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries()); + return new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions); } else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) { return new DorisCopyCommitter(dorisOptions, dorisExecutionOptions.getMaxRetries()); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 3959c26..1fbab85 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -18,10 +18,12 @@ package org.apache.doris.flink.sink.committer; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; +import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; @@ -58,20 +60,25 @@ public class DorisCommitter implements Committer<DorisCommittable>, Closeable { private final BackendUtil backendUtil; int maxRetry; + final boolean ignoreCommitError; public DorisCommitter( - DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, int maxRetry) { - this(dorisOptions, dorisReadOptions, maxRetry, new HttpUtil().getHttpClient()); + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions) { + this(dorisOptions, dorisReadOptions, executionOptions, new HttpUtil().getHttpClient()); } public DorisCommitter( DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, - int maxRetry, + DorisExecutionOptions executionOptions, CloseableHttpClient client) { this.dorisOptions = dorisOptions; this.dorisReadOptions = dorisReadOptions; - this.maxRetry = maxRetry; + Preconditions.checkArgument(maxRetry >= 0); + this.maxRetry = executionOptions.getMaxRetries(); + this.ignoreCommitError = executionOptions.ignoreCommitError(); this.httpClient = client; this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) @@ -99,8 +106,8 @@ public class DorisCommitter implements Committer<DorisCommittable>, Closeable { // hostPort String hostPort = committable.getHostPort(); - LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort); + Throwable ex = new Throwable(); int retry = 0; while (retry <= maxRetry) { // get latest-url @@ -130,17 +137,31 @@ public class DorisCommitter implements Committer<DorisCommittable>, Closeable { String reasonPhrase = statusLine.getReasonPhrase(); LOG.error("commit failed with {}, reason {}", hostPort, reasonPhrase); if (retry == maxRetry) { - throw new DorisRuntimeException("commit transaction error: " + reasonPhrase); + ex = new DorisRuntimeException("commit transaction error: " + reasonPhrase); } hostPort = backendUtil.getAvailableBackend(); } catch (Exception e) { LOG.error("commit transaction failed, to retry, {}", e.getMessage()); - if (retry == maxRetry) { - throw new DorisRuntimeException("commit transaction error, ", e); - } + ex = e; hostPort = backendUtil.getAvailableBackend(); } - retry++; + + if (retry++ >= maxRetry) { + if (ignoreCommitError) { + // Generally used when txn(stored in checkpoint) expires and unexpected + // errors occur in commit. + + // It should be noted that you must manually ensure that the txn has been + // successfully submitted to doris, otherwise there may be a risk of data + // loss. + LOG.error( + "Unable to commit transaction {} and data has been potentially lost ", + committable, + ex); + } else { + throw new DorisRuntimeException("commit transaction error, ", ex); + } + } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 2c7c753..c698871 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -235,6 +235,13 @@ public class DorisConfigOptions { .defaultValue(WriteMode.STREAM_LOAD.name()) .withDescription("Write mode, supports stream_load, stream_load_batch"); + public static final ConfigOption<Boolean> SINK_IGNORE_COMMIT_ERROR = + ConfigOptions.key("sink.ignore.commit-error") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to ignore commit errors. Usually used when the checkpoint cannot be restored to skip the commit of txn. The default is false."); + public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index c7d13f6..bf5cd8c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -73,6 +73,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; @@ -156,6 +157,7 @@ public final class DorisDynamicTableFactory options.add(SOURCE_USE_OLD_API); options.add(SINK_WRITE_MODE); + options.add(SINK_IGNORE_COMMIT_ERROR); return options; } @@ -226,6 +228,7 @@ public final class DorisDynamicTableFactory builder.setStreamLoadProp(streamLoadProp); builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE)); builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE)); + builder.setIgnoreCommitError(readableConfig.get(SINK_IGNORE_COMMIT_ERROR)); if (!readableConfig.get(SINK_ENABLE_2PC)) { builder.disable2PC(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index cd5ef1a..d713355 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -290,6 +290,9 @@ public abstract class DatabaseSync { sinkConfig .getOptional(DorisConfigOptions.SINK_WRITE_MODE) .ifPresent(v -> executionBuilder.setWriteMode(WriteMode.of(v))); + sinkConfig + .getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR) + .ifPresent(executionBuilder::setIgnoreCommitError); DorisExecutionOptions executionOptions = executionBuilder.build(); builder.setDorisReadOptions(DorisReadOptions.builder().build()) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java index f5c82bb..be5bb0d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.committer; +import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; @@ -59,6 +60,7 @@ public class TestDorisCommitter { public void setUp() throws Exception { DorisOptions dorisOptions = OptionUtils.buildDorisOptions(); DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions(); + DorisExecutionOptions executionOptions = OptionUtils.buildExecutionOptional(); dorisCommittable = new DorisCommittable("127.0.0.1:8710", "test", 0); CloseableHttpClient httpClient = mock(CloseableHttpClient.class); entityMock = new HttpEntityMock(); @@ -78,7 +80,8 @@ public class TestDorisCommitter { BackendV2.BackendRowV2.of("127.0.0.1", 8040, true))); backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true); - dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 3, httpClient); + dorisCommitter = + new DorisCommitter(dorisOptions, readOptions, executionOptions, httpClient); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org