This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8c15c4f [Improve]Added direct access to BE through the intranet (#187) 8c15c4f is described below commit 8c15c4f0bf2d63507b8ac1fd0b8b6d00a37afb6d Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Tue Sep 5 15:26:20 2023 +0800 [Improve]Added direct access to BE through the intranet (#187) --- flink-doris-connector/pom.xml | 8 +- .../doris/flink/cfg/DorisConnectionOptions.java | 16 +++- .../org/apache/doris/flink/cfg/DorisOptions.java | 19 +++- .../org/apache/doris/flink/sink/BackendUtil.java | 32 ++++++- .../flink/sink/batch/DorisBatchStreamLoad.java | 6 +- .../doris/flink/sink/committer/DorisCommitter.java | 10 +- .../doris/flink/sink/writer/DorisWriter.java | 7 +- .../doris/flink/table/DorisConfigOptions.java | 1 + .../flink/table/DorisDynamicTableFactory.java | 5 + .../doris/flink/table/DorisDynamicTableSource.java | 1 + .../doris/flink/table/DorisRowDataInputFormat.java | 5 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 + .../flink/DorisIntranetAccessSinkExample.java | 105 +++++++++++++++++++++ .../flink/sink/committer/TestDorisCommitter.java | 24 ++++- .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 5 +- .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 5 +- .../tools/cdc/CdcPostgresSyncDatabaseCase.java | 5 +- .../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 5 +- 18 files changed, 236 insertions(+), 25 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 333a40b..e52a07a 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -224,7 +224,13 @@ under the License. <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> - <version>2.27.0</version> + <version>4.2.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <version>4.2.0</version> <scope>test</scope> </dependency> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java index 00abd52..1382dde 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java @@ -31,6 +31,7 @@ public class DorisConnectionOptions implements Serializable { protected final String username; protected final String password; protected String jdbcUrl; + protected String benodes; public DorisConnectionOptions(String fenodes, String username, String password) { this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is empty"); @@ -38,8 +39,15 @@ public class DorisConnectionOptions implements Serializable { this.password = password; } - public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl){ - this(fenodes,username,password); + public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl) { + this(fenodes, username, password); + this.jdbcUrl = jdbcUrl; + } + + public DorisConnectionOptions(String fenodes, String benodes, String username, String password, + String jdbcUrl) { + this(fenodes, username, password); + this.benodes = benodes; this.jdbcUrl = jdbcUrl; } @@ -55,6 +63,10 @@ public class DorisConnectionOptions implements Serializable { return password; } + public String getBenodes() { + return benodes; + } + public String getJdbcUrl(){ return jdbcUrl; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index c9e36e3..cf7b932 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -42,6 +42,12 @@ public class DorisOptions extends DorisConnectionOptions { this.tableIdentifier = tableIdentifier; } + public DorisOptions(String fenodes, String beNodes, String username, String password, + String tableIdentifier, String jdbcUrl) { + super(fenodes, beNodes, username, password, jdbcUrl); + this.tableIdentifier = tableIdentifier; + } + public String getTableIdentifier() { return tableIdentifier; } @@ -60,7 +66,7 @@ public class DorisOptions extends DorisConnectionOptions { */ public static class Builder { private String fenodes; - + private String benodes; private String jdbcUrl; private String username; private String password; @@ -98,6 +104,14 @@ public class DorisOptions extends DorisConnectionOptions { return this; } + /** + * optional, Backend Http Port + */ + public Builder setBenodes(String benodes) { + this.benodes = benodes; + return this; + } + /** * not required, fe jdbc url, for lookup query */ @@ -109,9 +123,8 @@ public class DorisOptions extends DorisConnectionOptions { public DorisOptions build() { checkNotNull(fenodes, "No fenodes supplied."); checkNotNull(tableIdentifier, "No tableIdentifier supplied."); - return new DorisOptions(fenodes, username, password, tableIdentifier, jdbcUrl); + return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl); } } - } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 701cad6..9f9516a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -19,11 +19,15 @@ package org.apache.doris.flink.sink; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.rest.models.BackendV2; +import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.HttpURLConnection; import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class BackendUtil { @@ -36,13 +40,34 @@ public class BackendUtil { this.pos = 0; } + public BackendUtil(String beNodes) { + this.backends = initBackends(beNodes); + this.pos = 0; + } + + private List<BackendV2.BackendRowV2> initBackends(String beNodes) { + List<BackendV2.BackendRowV2> backends = new ArrayList<>(); + List<String> nodes = Arrays.asList(beNodes.split(",")); + nodes.forEach(node -> { + if (tryHttpConnection(node)) { + node = node.trim(); + String[] ipAndPort = node.split(":"); + BackendRowV2 backendRowV2 = new BackendRowV2(); + backendRowV2.setIp(ipAndPort[0]); + backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1])); + backendRowV2.setAlive(true); + backends.add(backendRowV2); + } + }); + return backends; + } + public String getAvailableBackend() { long tmp = pos + backends.size(); while (pos < tmp) { - BackendV2.BackendRowV2 backend = backends.get((int) (pos % backends.size())); + BackendV2.BackendRowV2 backend = backends.get((int) (pos++ % backends.size())); String res = backend.toBackendString(); - if(tryHttpConnection(res)){ - pos++; + if (tryHttpConnection(res)) { return res; } } @@ -60,7 +85,6 @@ public class BackendUtil { return true; } catch (Exception ex) { LOG.warn("Failed to connect to backend:{}", backend, ex); - pos++; return false; } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index a43220c..b6a3f65 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -29,6 +29,8 @@ import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.writer.LabelGenerator; + +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -93,7 +95,9 @@ public class DorisBatchStreamLoad implements Serializable { DorisReadOptions dorisReadOptions, DorisExecutionOptions executionOptions, LabelGenerator labelGenerator) { - this.backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); + this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil( + dorisOptions.getBenodes()) + : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); this.hostPort = backendUtil.getAvailableBackend(); String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); this.db = tableInfo[0]; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 3b61d82..5bb1a40 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.committer; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.connector.sink.Committer; import com.fasterxml.jackson.core.type.TypeReference; @@ -25,6 +26,7 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; @@ -55,6 +57,7 @@ public class DorisCommitter implements Committer<DorisCommittable> { private final DorisOptions dorisOptions; private final DorisReadOptions dorisReadOptions; private final ObjectMapper jsonMapper = new ObjectMapper(); + private final BackendUtil backendUtil; int maxRetry; @@ -67,6 +70,9 @@ public class DorisCommitter implements Committer<DorisCommittable> { this.dorisReadOptions = dorisReadOptions; this.maxRetry = maxRetry; this.httpClient = client; + this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil( + dorisOptions.getBenodes()) + : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); } @Override @@ -116,13 +122,13 @@ public class DorisCommitter implements Committer<DorisCommittable> { if (retry == maxRetry) { throw new DorisRuntimeException("stream load error: " + reasonPhrase); } - hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG); + hostPort = backendUtil.getAvailableBackend(); } catch (IOException e) { LOG.error("commit transaction failed: ", e); if (retry == maxRetry) { throw new IOException("commit transaction failed: {}", e); } - hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG); + hostPort = backendUtil.getAvailableBackend(); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 7890670..1f98206 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -27,6 +27,8 @@ import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpUtil; + +import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.connector.sink.SinkWriter; @@ -99,8 +101,9 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr } public void initializeLoad(List<DorisWriterState> state) throws IOException { - //cache backend - backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); + this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil( + dorisOptions.getBenodes()) + : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); try { this.dorisStreamLoad = new DorisStreamLoad( backendUtil.getAvailableBackend(), diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index ba408a6..98b9a78 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -40,6 +40,7 @@ public class DorisConfigOptions { public static final String IDENTIFIER = "doris"; // common option public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address."); + public static final ConfigOption<String> BENODES = ConfigOptions.key("benodes").stringType().noDefaultValue().withDescription("doris be http address."); public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the doris table name."); public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the doris user name."); public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the doris password."); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 66a6a19..57e4fc0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -20,6 +20,8 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisLookupOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; + +import static org.apache.doris.flink.table.DorisConfigOptions.BENODES; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableSchema; @@ -103,6 +105,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory public Set<ConfigOption<?>> optionalOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(FENODES); + options.add(BENODES); options.add(TABLE_IDENTIFIER); options.add(USERNAME); options.add(PASSWORD); @@ -169,8 +172,10 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory private DorisOptions getDorisOptions(ReadableConfig readableConfig) { final String fenodes = readableConfig.get(FENODES); + final String benodes = readableConfig.get(BENODES); final DorisOptions.Builder builder = DorisOptions.builder() .setFenodes(fenodes) + .setBenodes(benodes) .setJdbcUrl(readableConfig.get(JDBC_URL)) .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER)); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index bd04e20..35a4489 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -104,6 +104,7 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab } DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder() .setFenodes(options.getFenodes()) + .setBenodes(options.getBenodes()) .setUsername(options.getUsername()) .setPassword(options.getPassword()) .setTableIdentifier(options.getTableIdentifier()) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java index 7181ce6..47bcf69 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java @@ -198,6 +198,11 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable return this; } + public Builder setBenodes(String benodes) { + this.optionsBuilder.setBenodes(benodes); + return this; + } + public Builder setUsername(String username) { this.optionsBuilder.setUsername(username); return this; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index bc15987..455000e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -152,6 +152,7 @@ public abstract class DatabaseSync { */ public DorisSink<String> buildDorisSink(String table) { String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES); + String benodes = sinkConfig.getString(DorisConfigOptions.BENODES); String user = sinkConfig.getString(DorisConfigOptions.USERNAME); String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, ""); String labelPrefix = sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX); @@ -159,6 +160,7 @@ public abstract class DatabaseSync { DorisSink.Builder<String> builder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes(fenodes) + .setBenodes(benodes) .setTableIdentifier(database + "." + table) .setUsername(user) .setPassword(passwd); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java new file mode 100644 index 0000000..61ad1dd --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink; + +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.sink.writer.SimpleStringSerializer; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * When the flink connector accesses doris, it parses out all surviving BE nodes according to the FE address filled in. + * <p> + * However, when the BE node is deployed, most of the internal network IP is filled in, + * so the BE node parsed by FE is the internal network IP. When flink is deployed on a non-intranet segment, + * the BE node will be inaccessible on the network. + * <p> + * In this case, you can access the BE node on the intranet by directly configuring {@link DorisOptions.builder().setBenodes().build()}, + * after you configure this parameter, Flink Connector will not parse all BE nodes through FE nodes. + */ +public class DorisIntranetAccessSinkExample { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.enableCheckpointing(10000); + env.getCheckpointConfig() + .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000))); + + DorisSink.Builder<String> builder = DorisSink.builder(); + final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); + readOptionBuilder.setDeserializeArrowAsync(false) + .setDeserializeQueueSize(64) + .setExecMemLimit(2147483648L) + .setRequestQueryTimeoutS(3600) + .setRequestBatchSize(1000) + .setRequestConnectTimeoutMs(10000) + .setRequestReadTimeoutMs(10000) + .setRequestRetries(3) + .setRequestTabletSize(1024 * 1024); + + Properties properties = new Properties(); + properties.setProperty("column_separator", ","); + properties.setProperty("line_delimiter", "\n"); + properties.setProperty("format", "csv"); + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder.setFenodes("10.20.30.1:8030") + .setBenodes("10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040") + .setTableIdentifier("test.test_sink") + .setUsername("root") + .setPassword(""); + + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); + executionBuilder + .disable2PC().setLabelPrefix("label-doris") + .setStreamLoadProp(properties) + .setBufferSize(8 * 1024) + .setBufferCount(3); + + builder.setDorisReadOptions(readOptionBuilder.build()) + .setDorisExecutionOptions(executionBuilder.build()) + .setSerializer(new SimpleStringSerializer()) + .setDorisOptions(dorisBuilder.build()); + + List<Tuple2<Integer, String>> data = new ArrayList<>(); + data.add(new Tuple2<>(1, "zhangsan")); + data.add(new Tuple2<>(2, "lisi")); + data.add(new Tuple2<>(3, "wangwu")); + DataStreamSource<Tuple2<Integer, String>> source = env.fromCollection(data); + source.map((MapFunction<Tuple2<Integer, String>, String>) t -> t.f0 + "," + t.f1) + .sinkTo(builder.build()); + env.execute("doris test"); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java index e81638f..7cc2a88 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java @@ -20,14 +20,18 @@ package org.apache.doris.flink.sink.committer; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; +import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpEntityMock; import org.apache.doris.flink.sink.OptionUtils; + import org.apache.http.ProtocolVersion; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.message.BasicStatusLine; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -35,8 +39,11 @@ import org.junit.Test; import java.util.Collections; import static org.mockito.ArgumentMatchers.any; +import org.mockito.MockedStatic; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; +import org.slf4j.Logger; /** * Test for Doris Committer. @@ -46,8 +53,10 @@ public class TestDorisCommitter { DorisCommitter dorisCommitter; DorisCommittable dorisCommittable; HttpEntityMock entityMock; + private MockedStatic<RestService> restServiceMockedStatic; + @Before - public void setUp() throws Exception{ + public void setUp() throws Exception { DorisOptions dorisOptions = OptionUtils.buildDorisOptions(); DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions(); dorisCommittable = new DorisCommittable("127.0.0.1:8710", "test", 0); @@ -55,9 +64,15 @@ public class TestDorisCommitter { entityMock = new HttpEntityMock(); CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class); StatusLine normalLine = new BasicStatusLine(new ProtocolVersion("http", 1, 0), 200, ""); + restServiceMockedStatic = mockStatic(RestService.class); + Logger mockLogger = mock(Logger.class); + mock(RestService.class); + when(httpClient.execute(any())).thenReturn(httpResponse); when(httpResponse.getStatusLine()).thenReturn(normalLine); when(httpResponse.getEntity()).thenReturn(entityMock); + when(RestService.getBackendsV2(dorisOptions, readOptions, mockLogger)).thenReturn( + Collections.singletonList(new BackendRowV2())); dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 2, httpClient); } @@ -73,7 +88,7 @@ public class TestDorisCommitter { } @Test(expected = DorisRuntimeException.class) - public void testCommitAbort() throws Exception{ + public void testCommitAbort() throws Exception { String response = "{\n" + "\"status\": \"Fail\",\n" + "\"msg\": \"errCode = 2, detailMessage = transaction [25] is already aborted. abort reason: User Abort\"\n" + @@ -81,4 +96,9 @@ public class TestDorisCommitter { this.entityMock.setValue(response); dorisCommitter.commit(Collections.singletonList(dorisCommittable)); } + + @After + public void after() { + restServiceMockedStatic.close(); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index c20be39..1a205b1 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -52,10 +52,11 @@ public class CdcMysqlSyncDatabaseCase { Configuration config = Configuration.fromMap(mysqlConfig); Map<String,String> sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes","127.0.0.1:8030"); + sinkConfig.put("fenodes","10.20.30.1:8030"); + // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); sinkConfig.put("username","root"); sinkConfig.put("password",""); - sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030"); sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java index 08cf586..3a2a39e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java @@ -58,10 +58,11 @@ public class CdcOraclelSyncDatabaseCase { Configuration config = Configuration.fromMap(sourceConfig); Map<String,String> sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes","127.0.0.1:8030"); + sinkConfig.put("fenodes","10.20.30.1:8030"); + // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); sinkConfig.put("username","root"); sinkConfig.put("password",""); - sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030"); sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java index 4d5b485..cf5e1d8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java @@ -61,10 +61,11 @@ public class CdcPostgresSyncDatabaseCase { Configuration config = Configuration.fromMap(sourceConfig); Map<String,String> sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes","127.0.0.1:8737"); + sinkConfig.put("fenodes","10.20.30.1:8030"); + // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); sinkConfig.put("username","root"); sinkConfig.put("password",""); - sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9737"); + sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030"); sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java index 96780aa..7251a7f 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java @@ -59,10 +59,11 @@ public class CdcSqlServerSyncDatabaseCase { Configuration config = Configuration.fromMap(sourceConfig); Map<String,String> sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes","127.0.0.1:8030"); + sinkConfig.put("fenodes","10.20.30.1:8030"); + // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); sinkConfig.put("username","root"); sinkConfig.put("password",""); - sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030"); sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org