This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8aaaded [improve] add auto redirect and uniq default open 2pc (#202) 8aaaded is described below commit 8aaadedaf3fb3147a134e333603cb1af3d7530cf Author: wudi <676366...@qq.com> AuthorDate: Sun Oct 8 14:31:37 2023 +0800 [improve] add auto redirect and uniq default open 2pc (#202) --- .../doris/flink/cfg/DorisConnectionOptions.java | 27 +++++++++++++++++++--- .../doris/flink/cfg/DorisExecutionOptions.java | 27 +++++++++++++++++++--- .../org/apache/doris/flink/cfg/DorisOptions.java | 12 +++++++--- .../flink/exception/ConnectedFailedException.java | 4 ++-- .../org/apache/doris/flink/rest/RestService.java | 20 ++++++++++++++++ .../apache/doris/flink/rest/models/BackendV2.java | 8 +++++++ .../org/apache/doris/flink/sink/DorisSink.java | 16 +++++++++++++ .../doris/flink/table/DorisConfigOptions.java | 4 ++-- .../flink/table/DorisDynamicTableFactory.java | 9 ++++++-- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 10 ++++++-- 10 files changed, 120 insertions(+), 17 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java index 1382dde..541e4e5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java @@ -32,6 +32,11 @@ public class DorisConnectionOptions implements Serializable { protected final String password; protected String jdbcUrl; protected String benodes; + /** + * Used to enable automatic redirection of fe, + * When it is not enabled, it will actively request the be list, and the polling will initiate a streamload request to be. + */ + protected boolean autoRedirect; public DorisConnectionOptions(String fenodes, String username, String password) { this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is empty"); @@ -45,10 +50,11 @@ public class DorisConnectionOptions implements Serializable { } public DorisConnectionOptions(String fenodes, String benodes, String username, String password, - String jdbcUrl) { + String jdbcUrl, boolean autoRedirect) { this(fenodes, username, password); this.benodes = benodes; this.jdbcUrl = jdbcUrl; + this.autoRedirect = autoRedirect; } public String getFenodes() { @@ -71,21 +77,31 @@ public class DorisConnectionOptions implements Serializable { return jdbcUrl; } + public boolean isAutoRedirect() { + return autoRedirect; + } + /** * Builder for {@link DorisConnectionOptions}. */ public static class DorisConnectionOptionsBuilder { private String fenodes; + private String benodes; private String username; private String password; - private String jdbcUrl; + private boolean autoRedirect; public DorisConnectionOptionsBuilder withFenodes(String fenodes) { this.fenodes = fenodes; return this; } + public DorisConnectionOptionsBuilder withBenodes(String benodes) { + this.benodes = benodes; + return this; + } + public DorisConnectionOptionsBuilder withUsername(String username) { this.username = username; return this; @@ -101,8 +117,13 @@ public class DorisConnectionOptions implements Serializable { return this; } + public DorisConnectionOptionsBuilder withAutoRedirect(boolean autoRedirect) { + this.autoRedirect = autoRedirect; + return this; + } + public DorisConnectionOptions build() { - return new DorisConnectionOptions(fenodes, username, password, jdbcUrl); + return new DorisConnectionOptions(fenodes, benodes, username, password, jdbcUrl, autoRedirect); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 2422df8..8f7022d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -49,7 +49,8 @@ public class DorisExecutionOptions implements Serializable { */ private final Properties streamLoadProp; private final Boolean enableDelete; - private final Boolean enable2PC; + private Boolean enable2PC; + private boolean force2PC; //batch mode param private final int flushQueueSize; @@ -73,7 +74,8 @@ public class DorisExecutionOptions implements Serializable { int bufferFlushMaxRows, int bufferFlushMaxBytes, long bufferFlushIntervalMs, - boolean ignoreUpdateBefore) { + boolean ignoreUpdateBefore, + boolean force2PC) { Preconditions.checkArgument(maxRetries >= 0); this.checkInterval = checkInterval; this.maxRetries = maxRetries; @@ -84,6 +86,7 @@ public class DorisExecutionOptions implements Serializable { this.streamLoadProp = streamLoadProp; this.enableDelete = enableDelete; this.enable2PC = enable2PC; + this.force2PC = force2PC; this.enableBatchMode = enableBatchMode; this.flushQueueSize = flushQueueSize; @@ -176,6 +179,14 @@ public class DorisExecutionOptions implements Serializable { return ignoreUpdateBefore; } + public void setEnable2PC(Boolean enable2PC) { + this.enable2PC = enable2PC; + } + + public boolean force2PC() { + return force2PC; + } + /** * Builder of {@link DorisExecutionOptions}. */ @@ -190,6 +201,9 @@ public class DorisExecutionOptions implements Serializable { private boolean enableDelete = true; private boolean enable2PC = true; + //A flag used to determine whether to forcibly open 2pc. By default, the uniq model close 2pc. + private boolean force2PC = false; + private int flushQueueSize = DEFAULT_FLUSH_QUEUE_SIZE; private int bufferFlushMaxRows = DEFAULT_BUFFER_FLUSH_MAX_ROWS; private int bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES; @@ -244,6 +258,13 @@ public class DorisExecutionOptions implements Serializable { return this; } + public Builder enable2PC() { + this.enable2PC = true; + //Force open 2pc + this.force2PC = true; + return this; + } + public Builder enableBatchMode() { this.enableBatchMode = true; return this; @@ -278,7 +299,7 @@ public class DorisExecutionOptions implements Serializable { public DorisExecutionOptions build() { return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, useCache, streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows, - bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore); + bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore, force2PC); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index cf7b932..f560eae 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -43,8 +43,8 @@ public class DorisOptions extends DorisConnectionOptions { } public DorisOptions(String fenodes, String beNodes, String username, String password, - String tableIdentifier, String jdbcUrl) { - super(fenodes, beNodes, username, password, jdbcUrl); + String tableIdentifier, String jdbcUrl, boolean redirect) { + super(fenodes, beNodes, username, password, jdbcUrl, redirect); this.tableIdentifier = tableIdentifier; } @@ -70,6 +70,7 @@ public class DorisOptions extends DorisConnectionOptions { private String jdbcUrl; private String username; private String password; + private boolean autoRedirect; private String tableIdentifier; /** @@ -120,10 +121,15 @@ public class DorisOptions extends DorisConnectionOptions { return this; } + public Builder setAutoRedirect(boolean autoRedirect) { + this.autoRedirect = autoRedirect; + return this; + } + public DorisOptions build() { checkNotNull(fenodes, "No fenodes supplied."); checkNotNull(tableIdentifier, "No tableIdentifier supplied."); - return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl); + return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl, autoRedirect); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java index 6f755b7..8ab00ca 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java @@ -19,10 +19,10 @@ package org.apache.doris.flink.exception; public class ConnectedFailedException extends DorisRuntimeException { public ConnectedFailedException(String server, Throwable cause) { - super("Connect to " + server + "failed.", cause); + super("Connect to " + server + " failed.", cause); } public ConnectedFailedException(String server, int statusCode, Throwable cause) { - super("Connect to " + server + "failed, status code is " + statusCode + ".", cause); + super("Connect to " + server + " failed, status code is " + statusCode + ".", cause); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index b75586a..1633c96 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -356,6 +356,11 @@ public class RestService implements Serializable { public static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) { String feNodes = options.getFenodes(); List<String> feNodeList = allEndpoints(feNodes, logger); + + if(options.isAutoRedirect() && !feNodeList.isEmpty()){ + return convert(feNodeList); + } + for (String feNode: feNodeList) { try { String beUrl = "http://" + feNode + BACKENDS_V2; @@ -373,6 +378,21 @@ public class RestService implements Serializable { throw new DorisRuntimeException(errMsg); } + /** + * When the user turns on redirection, + * there is no need to explicitly obtain the be list, just treat the fe list as the be list. + * @param feNodeList + * @return + */ + private static List<BackendV2.BackendRowV2> convert(List<String> feNodeList){ + List<BackendV2.BackendRowV2> nodeList = new ArrayList<>(); + for(String node : feNodeList){ + String[] split = node.split(":"); + nodeList.add(BackendV2.BackendRowV2.of(split[0], Integer.valueOf(split[1]), true)); + } + return nodeList; + } + static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) { ObjectMapper mapper = new ObjectMapper(); BackendV2 backend; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java index b2f42db..98f9093 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java @@ -75,5 +75,13 @@ public class BackendV2 { return ip + ":" + httpPort; } + public static BackendRowV2 of(String ip, int httpPort, boolean alive){ + BackendRowV2 rowV2 = new BackendRowV2(); + rowV2.setIp(ip); + rowV2.setHttpPort(httpPort); + rowV2.setAlive(alive); + return rowV2; + } + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index d64e488..d1aee44 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.sink; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.committer.DorisCommitter; import org.apache.doris.flink.sink.writer.DorisRecordSerializer; import org.apache.doris.flink.sink.writer.DorisWriter; @@ -31,6 +32,8 @@ import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; @@ -43,6 +46,7 @@ import java.util.Optional; */ public class DorisSink<IN> implements Sink<IN, DorisCommittable, DorisWriterState, DorisCommittable> { + private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class); private final DorisOptions dorisOptions; private final DorisReadOptions dorisReadOptions; private final DorisExecutionOptions dorisExecutionOptions; @@ -56,6 +60,18 @@ public class DorisSink<IN> implements Sink<IN, DorisCommittable, DorisWriterStat this.dorisReadOptions = dorisReadOptions; this.dorisExecutionOptions = dorisExecutionOptions; this.serializer = serializer; + checkKeyType(); + } + + /** + * The uniq model has 2pc close by default unless 2pc is forced open + */ + private void checkKeyType() { + if (dorisExecutionOptions.enabled2PC() + && !dorisExecutionOptions.force2PC() + && RestService.isUniqueKeyType(dorisOptions, dorisReadOptions, LOG)){ + dorisExecutionOptions.setEnable2PC(false); + } } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 50b205c..af01ea5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -44,8 +44,8 @@ public class DorisConfigOptions { public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the doris table name."); public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the doris user name."); public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the doris password."); - public static final ConfigOption<String> JDBC_URL = ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris jdbc url address."); + public static final ConfigOption<Boolean> AUTO_REDIRECT = ConfigOptions.key("auto-redirect").booleanType().defaultValue(false).withDescription("Use automatic redirection of fe without explicitly obtaining the be list"); // source config options public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions @@ -176,7 +176,7 @@ public class DorisConfigOptions { public static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions .key("sink.buffer-size") .intType() - .defaultValue(256 * 1024) + .defaultValue(1024 * 1024) .withDescription("the buffer size to cache data for stream load."); public static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions .key("sink.buffer-count") diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 9583236..978fa27 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -20,8 +20,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisLookupOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; - -import static org.apache.doris.flink.table.DorisConfigOptions.BENODES; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableSchema; @@ -38,6 +36,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import static org.apache.doris.flink.table.DorisConfigOptions.AUTO_REDIRECT; +import static org.apache.doris.flink.table.DorisConfigOptions.BENODES; import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE; import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC; import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE; @@ -179,6 +179,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory final DorisOptions.Builder builder = DorisOptions.builder() .setFenodes(fenodes) .setBenodes(benodes) + .setAutoRedirect(readableConfig.get(AUTO_REDIRECT)) .setJdbcUrl(readableConfig.get(JDBC_URL)) .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER)); @@ -214,8 +215,12 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory builder.setStreamLoadProp(streamLoadProp); builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE)); builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE)); + if (!readableConfig.get(SINK_ENABLE_2PC)) { builder.disable2PC(); + } else if (readableConfig.getOptional(SINK_ENABLE_2PC).isPresent()){ + //force open 2pc + builder.enable2PC(); } if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index a28403e..8aef65d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -137,6 +137,7 @@ public abstract class DatabaseSync { private DorisConnectionOptions getDorisConnectionOptions() { String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES); + String benodes = sinkConfig.getString(DorisConfigOptions.BENODES); String user = sinkConfig.getString(DorisConfigOptions.USERNAME); String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, ""); String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL); @@ -145,6 +146,7 @@ public abstract class DatabaseSync { Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf"); DorisConnectionOptions.DorisConnectionOptionsBuilder builder = new DorisConnectionOptions.DorisConnectionOptionsBuilder() .withFenodes(fenodes) + .withBenodes(benodes) .withUsername(user) .withPassword(passwd) .withJdbcUrl(jdbcUrl); @@ -168,6 +170,7 @@ public abstract class DatabaseSync { .setTableIdentifier(database + "." + table) .setUsername(user) .setPassword(passwd); + sinkConfig.getOptional(DorisConfigOptions.AUTO_REDIRECT).ifPresent(dorisBuilder::setAutoRedirect); Properties pro = new Properties(); //default json data format @@ -187,9 +190,12 @@ public abstract class DatabaseSync { sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries); sinkConfig.getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE).ifPresent(executionBuilder::setIgnoreUpdateBefore); - boolean enable2pc = sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC); - if(!enable2pc){ + + if(!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)){ executionBuilder.disable2PC(); + } else if(sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()){ + //force open 2pc + executionBuilder.enable2PC(); } //batch option --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org