This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c15c4f  [Improve]Added direct access to BE through the intranet (#187)
8c15c4f is described below

commit 8c15c4f0bf2d63507b8ac1fd0b8b6d00a37afb6d
Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com>
AuthorDate: Tue Sep 5 15:26:20 2023 +0800

    [Improve]Added direct access to BE through the intranet (#187)
---
 flink-doris-connector/pom.xml                      |   8 +-
 .../doris/flink/cfg/DorisConnectionOptions.java    |  16 +++-
 .../org/apache/doris/flink/cfg/DorisOptions.java   |  19 +++-
 .../org/apache/doris/flink/sink/BackendUtil.java   |  32 ++++++-
 .../flink/sink/batch/DorisBatchStreamLoad.java     |   6 +-
 .../doris/flink/sink/committer/DorisCommitter.java |  10 +-
 .../doris/flink/sink/writer/DorisWriter.java       |   7 +-
 .../doris/flink/table/DorisConfigOptions.java      |   1 +
 .../flink/table/DorisDynamicTableFactory.java      |   5 +
 .../doris/flink/table/DorisDynamicTableSource.java |   1 +
 .../doris/flink/table/DorisRowDataInputFormat.java |   5 +
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |   2 +
 .../flink/DorisIntranetAccessSinkExample.java      | 105 +++++++++++++++++++++
 .../flink/sink/committer/TestDorisCommitter.java   |  24 ++++-
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |   5 +-
 .../tools/cdc/CdcOraclelSyncDatabaseCase.java      |   5 +-
 .../tools/cdc/CdcPostgresSyncDatabaseCase.java     |   5 +-
 .../tools/cdc/CdcSqlServerSyncDatabaseCase.java    |   5 +-
 18 files changed, 236 insertions(+), 25 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 333a40b..e52a07a 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -224,7 +224,13 @@ under the License.
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
-            <version>2.27.0</version>
+            <version>4.2.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>4.2.0</version>
             <scope>test</scope>
         </dependency>
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
index 00abd52..1382dde 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
@@ -31,6 +31,7 @@ public class DorisConnectionOptions implements Serializable {
     protected final String username;
     protected final String password;
     protected String jdbcUrl;
+    protected String benodes;
 
     public DorisConnectionOptions(String fenodes, String username, String 
password) {
         this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes  is 
empty");
@@ -38,8 +39,15 @@ public class DorisConnectionOptions implements Serializable {
         this.password = password;
     }
 
-    public DorisConnectionOptions(String fenodes, String username, String 
password, String jdbcUrl){
-        this(fenodes,username,password);
+    public DorisConnectionOptions(String fenodes, String username, String 
password, String jdbcUrl) {
+        this(fenodes, username, password);
+        this.jdbcUrl = jdbcUrl;
+    }
+
+    public DorisConnectionOptions(String fenodes, String benodes,  String 
username, String password,
+            String jdbcUrl) {
+        this(fenodes, username, password);
+        this.benodes = benodes;
         this.jdbcUrl = jdbcUrl;
     }
 
@@ -55,6 +63,10 @@ public class DorisConnectionOptions implements Serializable {
         return password;
     }
 
+    public String getBenodes() {
+        return benodes;
+    }
+
     public String getJdbcUrl(){
         return jdbcUrl;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index c9e36e3..cf7b932 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -42,6 +42,12 @@ public class DorisOptions extends DorisConnectionOptions {
         this.tableIdentifier = tableIdentifier;
     }
 
+    public DorisOptions(String fenodes, String beNodes, String username, 
String password,
+            String tableIdentifier, String jdbcUrl) {
+        super(fenodes, beNodes, username, password, jdbcUrl);
+        this.tableIdentifier = tableIdentifier;
+    }
+
     public String getTableIdentifier() {
         return tableIdentifier;
     }
@@ -60,7 +66,7 @@ public class DorisOptions extends DorisConnectionOptions {
      */
     public static class Builder {
         private String fenodes;
-
+        private String benodes;
         private String jdbcUrl;
         private String username;
         private String password;
@@ -98,6 +104,14 @@ public class DorisOptions extends DorisConnectionOptions {
             return this;
         }
 
+        /**
+         * optional, Backend Http Port
+         */
+        public Builder setBenodes(String benodes) {
+            this.benodes = benodes;
+            return this;
+        }
+
         /**
          * not required, fe jdbc url, for lookup query
          */
@@ -109,9 +123,8 @@ public class DorisOptions extends DorisConnectionOptions {
         public DorisOptions build() {
             checkNotNull(fenodes, "No fenodes supplied.");
             checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
-            return new DorisOptions(fenodes, username, password, 
tableIdentifier, jdbcUrl);
+            return new DorisOptions(fenodes, benodes, username, password, 
tableIdentifier, jdbcUrl);
         }
     }
 
-
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index 701cad6..9f9516a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -19,11 +19,15 @@ package org.apache.doris.flink.sink;
 
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.rest.models.BackendV2;
+import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class BackendUtil {
@@ -36,13 +40,34 @@ public class BackendUtil {
         this.pos = 0;
     }
 
+    public BackendUtil(String beNodes) {
+        this.backends = initBackends(beNodes);
+        this.pos = 0;
+    }
+
+    private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
+        List<BackendV2.BackendRowV2> backends = new ArrayList<>();
+        List<String> nodes = Arrays.asList(beNodes.split(","));
+        nodes.forEach(node -> {
+            if (tryHttpConnection(node)) {
+                node = node.trim();
+                String[] ipAndPort = node.split(":");
+                BackendRowV2 backendRowV2 = new BackendRowV2();
+                backendRowV2.setIp(ipAndPort[0]);
+                backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1]));
+                backendRowV2.setAlive(true);
+                backends.add(backendRowV2);
+            }
+        });
+        return backends;
+    }
+
     public String getAvailableBackend() {
         long tmp = pos + backends.size();
         while (pos < tmp) {
-            BackendV2.BackendRowV2 backend = backends.get((int) (pos % 
backends.size()));
+            BackendV2.BackendRowV2 backend = backends.get((int) (pos++ % 
backends.size()));
             String res = backend.toBackendString();
-            if(tryHttpConnection(res)){
-                pos++;
+            if (tryHttpConnection(res)) {
                 return res;
             }
         }
@@ -60,7 +85,6 @@ public class BackendUtil {
             return true;
         } catch (Exception ex) {
             LOG.warn("Failed to connect to backend:{}", backend, ex);
-            pos++;
             return false;
         }
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index a43220c..b6a3f65 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -29,6 +29,8 @@ import org.apache.doris.flink.sink.EscapeHandler;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.HttpUtil;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -93,7 +95,9 @@ public class DorisBatchStreamLoad implements Serializable {
                                 DorisReadOptions dorisReadOptions,
                                 DorisExecutionOptions executionOptions,
                                 LabelGenerator labelGenerator) {
-        this.backendUtil = new 
BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
+        this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? 
new BackendUtil(
+                dorisOptions.getBenodes())
+                : new BackendUtil(RestService.getBackendsV2(dorisOptions, 
dorisReadOptions, LOG));
         this.hostPort = backendUtil.getAvailableBackend();
         String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
         this.db = tableInfo[0];
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 3b61d82..5bb1a40 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.committer;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.connector.sink.Committer;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -25,6 +26,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.HttpUtil;
@@ -55,6 +57,7 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
     private final DorisOptions dorisOptions;
     private final DorisReadOptions dorisReadOptions;
     private final ObjectMapper jsonMapper = new ObjectMapper();
+    private final BackendUtil backendUtil;
 
     int maxRetry;
 
@@ -67,6 +70,9 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
         this.dorisReadOptions = dorisReadOptions;
         this.maxRetry = maxRetry;
         this.httpClient = client;
+        this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? 
new BackendUtil(
+                dorisOptions.getBenodes())
+                : new BackendUtil(RestService.getBackendsV2(dorisOptions, 
dorisReadOptions, LOG));
     }
 
     @Override
@@ -116,13 +122,13 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
                 if (retry == maxRetry) {
                     throw new DorisRuntimeException("stream load error: " + 
reasonPhrase);
                 }
-                hostPort = RestService.getBackend(dorisOptions, 
dorisReadOptions, LOG);
+                hostPort = backendUtil.getAvailableBackend();
             } catch (IOException e) {
                 LOG.error("commit transaction failed: ", e);
                 if (retry == maxRetry) {
                     throw new IOException("commit transaction failed: {}", e);
                 }
-                hostPort = RestService.getBackend(dorisOptions, 
dorisReadOptions, LOG);
+                hostPort = backendUtil.getAvailableBackend();
             }
         }
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 7890670..1f98206 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -27,6 +27,8 @@ import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpUtil;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.connector.sink.SinkWriter;
@@ -99,8 +101,9 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     }
 
     public void initializeLoad(List<DorisWriterState> state) throws 
IOException {
-        //cache backend
-        backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, 
dorisReadOptions, LOG));
+        this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? 
new BackendUtil(
+                dorisOptions.getBenodes())
+                : new BackendUtil(RestService.getBackendsV2(dorisOptions, 
dorisReadOptions, LOG));
         try {
             this.dorisStreamLoad = new DorisStreamLoad(
                     backendUtil.getAvailableBackend(),
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index ba408a6..98b9a78 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -40,6 +40,7 @@ public class DorisConfigOptions {
     public static final String IDENTIFIER = "doris";
     // common option
     public static final ConfigOption<String> FENODES = 
ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris
 fe http address.");
+    public static final ConfigOption<String> BENODES = 
ConfigOptions.key("benodes").stringType().noDefaultValue().withDescription("doris
 be http address.");
     public static final ConfigOption<String> TABLE_IDENTIFIER = 
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
 doris table name.");
     public static final ConfigOption<String> USERNAME = 
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
 doris user name.");
     public static final ConfigOption<String> PASSWORD = 
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
 doris password.");
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 66a6a19..57e4fc0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -20,6 +20,8 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisLookupOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+
+import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableSchema;
@@ -103,6 +105,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
     public Set<ConfigOption<?>> optionalOptions() {
         final Set<ConfigOption<?>> options = new HashSet<>();
         options.add(FENODES);
+        options.add(BENODES);
         options.add(TABLE_IDENTIFIER);
         options.add(USERNAME);
         options.add(PASSWORD);
@@ -169,8 +172,10 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
 
     private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
         final String fenodes = readableConfig.get(FENODES);
+        final String benodes = readableConfig.get(BENODES);
         final DorisOptions.Builder builder = DorisOptions.builder()
                 .setFenodes(fenodes)
+                .setBenodes(benodes)
                 .setJdbcUrl(readableConfig.get(JDBC_URL))
                 .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index bd04e20..35a4489 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -104,6 +104,7 @@ public final class DorisDynamicTableSource implements 
ScanTableSource, LookupTab
             }
             DorisRowDataInputFormat.Builder builder = 
DorisRowDataInputFormat.builder()
                     .setFenodes(options.getFenodes())
+                    .setBenodes(options.getBenodes())
                     .setUsername(options.getUsername())
                     .setPassword(options.getPassword())
                     .setTableIdentifier(options.getTableIdentifier())
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index 7181ce6..47bcf69 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -198,6 +198,11 @@ public class DorisRowDataInputFormat extends 
RichInputFormat<RowData, DorisTable
             return this;
         }
 
+        public Builder setBenodes(String benodes) {
+            this.optionsBuilder.setBenodes(benodes);
+            return this;
+        }
+
         public Builder setUsername(String username) {
             this.optionsBuilder.setUsername(username);
             return this;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index bc15987..455000e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -152,6 +152,7 @@ public abstract class DatabaseSync {
      */
     public DorisSink<String> buildDorisSink(String table) {
         String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
+        String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
         String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
         String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
         String labelPrefix = 
sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);
@@ -159,6 +160,7 @@ public abstract class DatabaseSync {
         DorisSink.Builder<String> builder = DorisSink.builder();
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
         dorisBuilder.setFenodes(fenodes)
+                .setBenodes(benodes)
                 .setTableIdentifier(database + "." + table)
                 .setUsername(user)
                 .setPassword(passwd);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java
new file mode 100644
index 0000000..61ad1dd
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * When the flink connector accesses doris, it parses out all surviving BE 
nodes according to the FE address filled in.
+ * <p>
+ * However, when the BE node is deployed, most of the internal network IP is 
filled in,
+ * so the BE node parsed by FE is the internal network IP. When flink is 
deployed on a non-intranet segment,
+ * the BE node will be inaccessible on the network.
+ * <p>
+ * In this case, you can access the BE node on the intranet by directly 
configuring {@link DorisOptions.builder().setBenodes().build()},
+ * after you configure this parameter, Flink Connector will not parse all BE 
nodes through FE nodes.
+ */
+public class DorisIntranetAccessSinkExample {
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.enableCheckpointing(10000);
+        env.getCheckpointConfig()
+                
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
Time.milliseconds(30000)));
+
+        DorisSink.Builder<String> builder = DorisSink.builder();
+        final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
+        readOptionBuilder.setDeserializeArrowAsync(false)
+                .setDeserializeQueueSize(64)
+                .setExecMemLimit(2147483648L)
+                .setRequestQueryTimeoutS(3600)
+                .setRequestBatchSize(1000)
+                .setRequestConnectTimeoutMs(10000)
+                .setRequestReadTimeoutMs(10000)
+                .setRequestRetries(3)
+                .setRequestTabletSize(1024 * 1024);
+
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("line_delimiter", "\n");
+        properties.setProperty("format", "csv");
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes("10.20.30.1:8030")
+                .setBenodes("10.20.30.1:8040, 10.20.30.2:8040, 
10.20.30.3:8040")
+                .setTableIdentifier("test.test_sink")
+                .setUsername("root")
+                .setPassword("");
+
+        DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
+        executionBuilder
+                .disable2PC().setLabelPrefix("label-doris")
+                .setStreamLoadProp(properties)
+                .setBufferSize(8 * 1024)
+                .setBufferCount(3);
+
+        builder.setDorisReadOptions(readOptionBuilder.build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(new SimpleStringSerializer())
+                .setDorisOptions(dorisBuilder.build());
+
+        List<Tuple2<Integer, String>> data = new ArrayList<>();
+        data.add(new Tuple2<>(1, "zhangsan"));
+        data.add(new Tuple2<>(2, "lisi"));
+        data.add(new Tuple2<>(3, "wangwu"));
+        DataStreamSource<Tuple2<Integer, String>> source = 
env.fromCollection(data);
+        source.map((MapFunction<Tuple2<Integer, String>, String>) t -> t.f0 + 
"," + t.f1)
+                .sinkTo(builder.build());
+        env.execute("doris test");
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
index e81638f..7cc2a88 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
@@ -20,14 +20,18 @@ package org.apache.doris.flink.sink.committer;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpEntityMock;
 import org.apache.doris.flink.sink.OptionUtils;
+
 import org.apache.http.ProtocolVersion;
 import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.message.BasicStatusLine;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -35,8 +39,11 @@ import org.junit.Test;
 import java.util.Collections;
 
 import static org.mockito.ArgumentMatchers.any;
+import org.mockito.MockedStatic;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
+import org.slf4j.Logger;
 
 /**
  * Test for Doris Committer.
@@ -46,8 +53,10 @@ public class TestDorisCommitter {
     DorisCommitter dorisCommitter;
     DorisCommittable dorisCommittable;
     HttpEntityMock entityMock;
+    private MockedStatic<RestService> restServiceMockedStatic;
+
     @Before
-    public void setUp() throws Exception{
+    public void setUp() throws Exception {
         DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
         DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions();
         dorisCommittable = new DorisCommittable("127.0.0.1:8710", "test", 0);
@@ -55,9 +64,15 @@ public class TestDorisCommitter {
         entityMock = new HttpEntityMock();
         CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
         StatusLine normalLine = new BasicStatusLine(new 
ProtocolVersion("http", 1, 0), 200, "");
+        restServiceMockedStatic = mockStatic(RestService.class);
+        Logger mockLogger = mock(Logger.class);
+        mock(RestService.class);
+
         when(httpClient.execute(any())).thenReturn(httpResponse);
         when(httpResponse.getStatusLine()).thenReturn(normalLine);
         when(httpResponse.getEntity()).thenReturn(entityMock);
+        when(RestService.getBackendsV2(dorisOptions, readOptions, 
mockLogger)).thenReturn(
+                Collections.singletonList(new BackendRowV2()));
         dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 2, 
httpClient);
     }
 
@@ -73,7 +88,7 @@ public class TestDorisCommitter {
     }
 
     @Test(expected = DorisRuntimeException.class)
-    public void testCommitAbort() throws Exception{
+    public void testCommitAbort() throws Exception {
         String response = "{\n" +
                 "\"status\": \"Fail\",\n" +
                 "\"msg\": \"errCode = 2, detailMessage = transaction [25] is 
already aborted. abort reason: User Abort\"\n" +
@@ -81,4 +96,9 @@ public class TestDorisCommitter {
         this.entityMock.setValue(response);
         dorisCommitter.commit(Collections.singletonList(dorisCommittable));
     }
+
+    @After
+    public void after() {
+        restServiceMockedStatic.close();
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index c20be39..1a205b1 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -52,10 +52,11 @@ public class CdcMysqlSyncDatabaseCase {
         Configuration config = Configuration.fromMap(mysqlConfig);
 
         Map<String,String> sinkConfig = new HashMap<>();
-        sinkConfig.put("fenodes","127.0.0.1:8030");
+        sinkConfig.put("fenodes","10.20.30.1:8030");
+        // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 
10.20.30.3:8040");
         sinkConfig.put("username","root");
         sinkConfig.put("password","");
-        sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+        sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030");
         sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 08cf586..3a2a39e 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -58,10 +58,11 @@ public class CdcOraclelSyncDatabaseCase {
         Configuration config = Configuration.fromMap(sourceConfig);
 
         Map<String,String> sinkConfig = new HashMap<>();
-        sinkConfig.put("fenodes","127.0.0.1:8030");
+        sinkConfig.put("fenodes","10.20.30.1:8030");
+        // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 
10.20.30.3:8040");
         sinkConfig.put("username","root");
         sinkConfig.put("password","");
-        sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+        sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030");
         sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 4d5b485..cf5e1d8 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -61,10 +61,11 @@ public class CdcPostgresSyncDatabaseCase {
         Configuration config = Configuration.fromMap(sourceConfig);
 
         Map<String,String> sinkConfig = new HashMap<>();
-        sinkConfig.put("fenodes","127.0.0.1:8737");
+        sinkConfig.put("fenodes","10.20.30.1:8030");
+        // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 
10.20.30.3:8040");
         sinkConfig.put("username","root");
         sinkConfig.put("password","");
-        sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9737");
+        sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030");
         sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 96780aa..7251a7f 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -59,10 +59,11 @@ public class CdcSqlServerSyncDatabaseCase {
         Configuration config = Configuration.fromMap(sourceConfig);
 
         Map<String,String> sinkConfig = new HashMap<>();
-        sinkConfig.put("fenodes","127.0.0.1:8030");
+        sinkConfig.put("fenodes","10.20.30.1:8030");
+        // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 
10.20.30.3:8040");
         sinkConfig.put("username","root");
         sinkConfig.put("password","");
-        sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+        sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030");
         sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to