This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aaaded  [improve] add auto redirect and uniq default open 2pc (#202)
8aaaded is described below

commit 8aaadedaf3fb3147a134e333603cb1af3d7530cf
Author: wudi <676366...@qq.com>
AuthorDate: Sun Oct 8 14:31:37 2023 +0800

    [improve] add auto redirect and uniq default open 2pc (#202)
---
 .../doris/flink/cfg/DorisConnectionOptions.java    | 27 +++++++++++++++++++---
 .../doris/flink/cfg/DorisExecutionOptions.java     | 27 +++++++++++++++++++---
 .../org/apache/doris/flink/cfg/DorisOptions.java   | 12 +++++++---
 .../flink/exception/ConnectedFailedException.java  |  4 ++--
 .../org/apache/doris/flink/rest/RestService.java   | 20 ++++++++++++++++
 .../apache/doris/flink/rest/models/BackendV2.java  |  8 +++++++
 .../org/apache/doris/flink/sink/DorisSink.java     | 16 +++++++++++++
 .../doris/flink/table/DorisConfigOptions.java      |  4 ++--
 .../flink/table/DorisDynamicTableFactory.java      |  9 ++++++--
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 10 ++++++--
 10 files changed, 120 insertions(+), 17 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
index 1382dde..541e4e5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
@@ -32,6 +32,11 @@ public class DorisConnectionOptions implements Serializable {
     protected final String password;
     protected String jdbcUrl;
     protected String benodes;
+    /**
+     * Used to enable automatic redirection of fe,
+     * When it is not enabled, it will actively request the be list, and the 
polling will initiate a streamload request to be.
+     */
+    protected boolean autoRedirect;
 
     public DorisConnectionOptions(String fenodes, String username, String 
password) {
         this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes  is 
empty");
@@ -45,10 +50,11 @@ public class DorisConnectionOptions implements Serializable 
{
     }
 
     public DorisConnectionOptions(String fenodes, String benodes,  String 
username, String password,
-            String jdbcUrl) {
+            String jdbcUrl, boolean autoRedirect) {
         this(fenodes, username, password);
         this.benodes = benodes;
         this.jdbcUrl = jdbcUrl;
+        this.autoRedirect = autoRedirect;
     }
 
     public String getFenodes() {
@@ -71,21 +77,31 @@ public class DorisConnectionOptions implements Serializable 
{
         return jdbcUrl;
     }
 
+    public boolean isAutoRedirect() {
+        return autoRedirect;
+    }
+
     /**
      * Builder for {@link DorisConnectionOptions}.
      */
     public static class DorisConnectionOptionsBuilder {
         private String fenodes;
+        private String benodes;
         private String username;
         private String password;
-
         private String jdbcUrl;
+        private boolean autoRedirect;
 
         public DorisConnectionOptionsBuilder withFenodes(String fenodes) {
             this.fenodes = fenodes;
             return this;
         }
 
+        public DorisConnectionOptionsBuilder withBenodes(String benodes) {
+            this.benodes = benodes;
+            return this;
+        }
+
         public DorisConnectionOptionsBuilder withUsername(String username) {
             this.username = username;
             return this;
@@ -101,8 +117,13 @@ public class DorisConnectionOptions implements 
Serializable {
             return this;
         }
 
+        public DorisConnectionOptionsBuilder withAutoRedirect(boolean 
autoRedirect) {
+            this.autoRedirect = autoRedirect;
+            return this;
+        }
+
         public DorisConnectionOptions build() {
-            return new DorisConnectionOptions(fenodes, username, password, 
jdbcUrl);
+            return new DorisConnectionOptions(fenodes, benodes, username, 
password, jdbcUrl, autoRedirect);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 2422df8..8f7022d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -49,7 +49,8 @@ public class DorisExecutionOptions implements Serializable {
      */
     private final Properties streamLoadProp;
     private final Boolean enableDelete;
-    private final Boolean enable2PC;
+    private Boolean enable2PC;
+    private boolean force2PC;
 
     //batch mode param
     private final int flushQueueSize;
@@ -73,7 +74,8 @@ public class DorisExecutionOptions implements Serializable {
                                  int bufferFlushMaxRows,
                                  int bufferFlushMaxBytes,
                                  long bufferFlushIntervalMs,
-                                 boolean ignoreUpdateBefore) {
+                                 boolean ignoreUpdateBefore,
+                                 boolean force2PC) {
         Preconditions.checkArgument(maxRetries >= 0);
         this.checkInterval = checkInterval;
         this.maxRetries = maxRetries;
@@ -84,6 +86,7 @@ public class DorisExecutionOptions implements Serializable {
         this.streamLoadProp = streamLoadProp;
         this.enableDelete = enableDelete;
         this.enable2PC = enable2PC;
+        this.force2PC = force2PC;
 
         this.enableBatchMode = enableBatchMode;
         this.flushQueueSize = flushQueueSize;
@@ -176,6 +179,14 @@ public class DorisExecutionOptions implements Serializable 
{
         return ignoreUpdateBefore;
     }
 
+    public void setEnable2PC(Boolean enable2PC) {
+        this.enable2PC = enable2PC;
+    }
+
+    public boolean force2PC() {
+        return force2PC;
+    }
+
     /**
      * Builder of {@link DorisExecutionOptions}.
      */
@@ -190,6 +201,9 @@ public class DorisExecutionOptions implements Serializable {
         private boolean enableDelete = true;
         private boolean enable2PC = true;
 
+        //A flag used to determine whether to forcibly open 2pc. By default, 
the uniq model close 2pc.
+        private boolean force2PC = false;
+
         private int flushQueueSize = DEFAULT_FLUSH_QUEUE_SIZE;
         private int bufferFlushMaxRows = DEFAULT_BUFFER_FLUSH_MAX_ROWS;
         private int bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES;
@@ -244,6 +258,13 @@ public class DorisExecutionOptions implements Serializable 
{
             return this;
         }
 
+        public Builder enable2PC() {
+            this.enable2PC = true;
+            //Force open 2pc
+            this.force2PC = true;
+            return this;
+        }
+
         public Builder enableBatchMode() {
             this.enableBatchMode = true;
             return this;
@@ -278,7 +299,7 @@ public class DorisExecutionOptions implements Serializable {
         public DorisExecutionOptions build() {
             return new DorisExecutionOptions(checkInterval, maxRetries, 
bufferSize, bufferCount, labelPrefix, useCache,
                     streamLoadProp, enableDelete, enable2PC, enableBatchMode, 
flushQueueSize, bufferFlushMaxRows,
-                    bufferFlushMaxBytes, bufferFlushIntervalMs, 
ignoreUpdateBefore);
+                    bufferFlushMaxBytes, bufferFlushIntervalMs, 
ignoreUpdateBefore, force2PC);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index cf7b932..f560eae 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -43,8 +43,8 @@ public class DorisOptions extends DorisConnectionOptions {
     }
 
     public DorisOptions(String fenodes, String beNodes, String username, 
String password,
-            String tableIdentifier, String jdbcUrl) {
-        super(fenodes, beNodes, username, password, jdbcUrl);
+            String tableIdentifier, String jdbcUrl, boolean redirect) {
+        super(fenodes, beNodes, username, password, jdbcUrl, redirect);
         this.tableIdentifier = tableIdentifier;
     }
 
@@ -70,6 +70,7 @@ public class DorisOptions extends DorisConnectionOptions {
         private String jdbcUrl;
         private String username;
         private String password;
+        private boolean autoRedirect;
         private String tableIdentifier;
 
         /**
@@ -120,10 +121,15 @@ public class DorisOptions extends DorisConnectionOptions {
             return this;
         }
 
+        public Builder setAutoRedirect(boolean autoRedirect) {
+            this.autoRedirect = autoRedirect;
+            return this;
+        }
+
         public DorisOptions build() {
             checkNotNull(fenodes, "No fenodes supplied.");
             checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
-            return new DorisOptions(fenodes, benodes, username, password, 
tableIdentifier, jdbcUrl);
+            return new DorisOptions(fenodes, benodes, username, password, 
tableIdentifier, jdbcUrl, autoRedirect);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
index 6f755b7..8ab00ca 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
@@ -19,10 +19,10 @@ package org.apache.doris.flink.exception;
 
 public class ConnectedFailedException extends DorisRuntimeException {
     public ConnectedFailedException(String server, Throwable cause) {
-        super("Connect to " + server + "failed.", cause);
+        super("Connect to " + server + " failed.", cause);
     }
 
     public ConnectedFailedException(String server, int statusCode, Throwable 
cause) {
-        super("Connect to " + server + "failed, status code is " + statusCode 
+ ".", cause);
+        super("Connect to " + server + " failed, status code is " + statusCode 
+ ".", cause);
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index b75586a..1633c96 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -356,6 +356,11 @@ public class RestService implements Serializable {
     public static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions 
options, DorisReadOptions readOptions, Logger logger) {
         String feNodes = options.getFenodes();
         List<String> feNodeList = allEndpoints(feNodes, logger);
+
+        if(options.isAutoRedirect() && !feNodeList.isEmpty()){
+            return convert(feNodeList);
+        }
+
         for (String feNode: feNodeList) {
             try {
                 String beUrl = "http://"; + feNode + BACKENDS_V2;
@@ -373,6 +378,21 @@ public class RestService implements Serializable {
         throw new DorisRuntimeException(errMsg);
     }
 
+    /**
+     * When the user turns on redirection,
+     * there is no need to explicitly obtain the be list, just treat the fe 
list as the be list.
+     * @param feNodeList
+     * @return
+     */
+    private static List<BackendV2.BackendRowV2> convert(List<String> 
feNodeList){
+        List<BackendV2.BackendRowV2> nodeList = new ArrayList<>();
+        for(String node : feNodeList){
+            String[] split = node.split(":");
+            nodeList.add(BackendV2.BackendRowV2.of(split[0], 
Integer.valueOf(split[1]), true));
+        }
+        return nodeList;
+    }
+
     static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger 
logger) {
         ObjectMapper mapper = new ObjectMapper();
         BackendV2 backend;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
index b2f42db..98f9093 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
@@ -75,5 +75,13 @@ public class BackendV2 {
             return ip + ":" + httpPort;
         }
 
+        public static BackendRowV2 of(String ip, int httpPort, boolean alive){
+            BackendRowV2 rowV2 = new BackendRowV2();
+            rowV2.setIp(ip);
+            rowV2.setHttpPort(httpPort);
+            rowV2.setAlive(alive);
+            return rowV2;
+        }
+
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index d64e488..d1aee44 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.sink;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.committer.DorisCommitter;
 import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
 import org.apache.doris.flink.sink.writer.DorisWriter;
@@ -31,6 +32,8 @@ import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
@@ -43,6 +46,7 @@ import java.util.Optional;
  */
 public class DorisSink<IN> implements Sink<IN, DorisCommittable, 
DorisWriterState, DorisCommittable> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class);
     private final DorisOptions dorisOptions;
     private final DorisReadOptions dorisReadOptions;
     private final DorisExecutionOptions dorisExecutionOptions;
@@ -56,6 +60,18 @@ public class DorisSink<IN> implements Sink<IN, 
DorisCommittable, DorisWriterStat
         this.dorisReadOptions = dorisReadOptions;
         this.dorisExecutionOptions = dorisExecutionOptions;
         this.serializer = serializer;
+        checkKeyType();
+    }
+
+    /**
+     * The uniq model has 2pc close by default unless 2pc is forced open
+     */
+    private void checkKeyType() {
+        if (dorisExecutionOptions.enabled2PC()
+                && !dorisExecutionOptions.force2PC()
+                && RestService.isUniqueKeyType(dorisOptions, dorisReadOptions, 
LOG)){
+            dorisExecutionOptions.setEnable2PC(false);
+        }
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 50b205c..af01ea5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -44,8 +44,8 @@ public class DorisConfigOptions {
     public static final ConfigOption<String> TABLE_IDENTIFIER = 
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
 doris table name.");
     public static final ConfigOption<String> USERNAME = 
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
 doris user name.");
     public static final ConfigOption<String> PASSWORD = 
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
 doris password.");
-
     public static final ConfigOption<String> JDBC_URL = 
ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris
 jdbc url address.");
+    public static final ConfigOption<Boolean> AUTO_REDIRECT = 
ConfigOptions.key("auto-redirect").booleanType().defaultValue(false).withDescription("Use
 automatic redirection of fe without explicitly obtaining the be list");
 
     // source config options
     public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
@@ -176,7 +176,7 @@ public class DorisConfigOptions {
     public static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions
             .key("sink.buffer-size")
             .intType()
-            .defaultValue(256 * 1024)
+            .defaultValue(1024 * 1024)
             .withDescription("the buffer size to cache data for stream load.");
     public static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions
             .key("sink.buffer-count")
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 9583236..978fa27 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -20,8 +20,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisLookupOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-
-import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableSchema;
@@ -38,6 +36,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.doris.flink.table.DorisConfigOptions.AUTO_REDIRECT;
+import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
@@ -179,6 +179,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         final DorisOptions.Builder builder = DorisOptions.builder()
                 .setFenodes(fenodes)
                 .setBenodes(benodes)
+                .setAutoRedirect(readableConfig.get(AUTO_REDIRECT))
                 .setJdbcUrl(readableConfig.get(JDBC_URL))
                 .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
 
@@ -214,8 +215,12 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         builder.setStreamLoadProp(streamLoadProp);
         builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
         
builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE));
+
         if (!readableConfig.get(SINK_ENABLE_2PC)) {
             builder.disable2PC();
+        } else if (readableConfig.getOptional(SINK_ENABLE_2PC).isPresent()){
+            //force open 2pc
+            builder.enable2PC();
         }
 
         if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index a28403e..8aef65d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -137,6 +137,7 @@ public abstract class DatabaseSync {
 
     private DorisConnectionOptions getDorisConnectionOptions() {
         String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
+        String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
         String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
         String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
         String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);
@@ -145,6 +146,7 @@ public abstract class DatabaseSync {
         Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf");
         DorisConnectionOptions.DorisConnectionOptionsBuilder builder = new 
DorisConnectionOptions.DorisConnectionOptionsBuilder()
                 .withFenodes(fenodes)
+                .withBenodes(benodes)
                 .withUsername(user)
                 .withPassword(passwd)
                 .withJdbcUrl(jdbcUrl);
@@ -168,6 +170,7 @@ public abstract class DatabaseSync {
                 .setTableIdentifier(database + "." + table)
                 .setUsername(user)
                 .setPassword(passwd);
+        
sinkConfig.getOptional(DorisConfigOptions.AUTO_REDIRECT).ifPresent(dorisBuilder::setAutoRedirect);
 
         Properties pro = new Properties();
         //default json data format
@@ -187,9 +190,12 @@ public abstract class DatabaseSync {
         
sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries);
         
sinkConfig.getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE).ifPresent(executionBuilder::setIgnoreUpdateBefore);
 
-        boolean enable2pc = 
sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC);
-        if(!enable2pc){
+
+        if(!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)){
             executionBuilder.disable2PC();
+        } else 
if(sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()){
+            //force open 2pc
+            executionBuilder.enable2PC();
         }
 
         //batch option


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to