This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 82925e08 [Improve](case) add itcase for dorissource , dorissink and lookup join (#569) 82925e08 is described below commit 82925e08aa2525af9c66f36e47b7ebbce881384c Author: wudi <676366...@qq.com> AuthorDate: Wed Mar 12 15:57:26 2025 +0800 [Improve](case) add itcase for dorissource , dorissink and lookup join (#569) * add source case --- flink-doris-connector/pom.xml | 14 +- .../doris/flink/catalog/doris/DataModel.java | 1 + .../apache/doris/flink/cfg/DorisReadOptions.java | 8 + .../java/org/apache/doris/flink/lookup/Worker.java | 2 +- .../apache/doris/flink/serialization/RowBatch.java | 1 + .../flink/container/AbstractContainerTestBase.java | 2 +- .../flink/container/AbstractITCaseService.java | 14 ++ .../doris/flink/container/ContainerUtils.java | 51 +++- .../flink/container/e2e/Doris2DorisE2ECase.java | 69 +++--- .../flink/container/instance/ContainerService.java | 4 + .../flink/container/instance/DorisContainer.java | 34 ++- .../doris/flink/lookup/DorisLookupTableITCase.java | 234 +++++++++++++++++- .../doris/flink/sink/DorisSinkFailoverITCase.java | 265 +++++++++++++++++++++ .../apache/doris/flink/sink/DorisSinkITCase.java | 161 ++++++++++--- .../doris/flink/source/DorisSourceITCase.java | 189 +++++++++++---- .../org/apache/doris/flink/utils/MockSource.java | 12 +- .../doris2doris/test_doris2doris_sink_test_tbl.sql | 10 +- .../test_doris2doris_source_test_tbl.sql | 73 ++---- .../container/e2e/mysql2doris/testMySQL2Doris.txt | 1 + .../src/test/resources/docker/doris/be.conf | 99 ++++++++ .../src/test/resources/docker/doris/fe.conf | 74 ++++++ 21 files changed, 1130 insertions(+), 188 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index fde9eeb5..c584ebbf 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -572,13 +572,15 @@ under the License. <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <version>2.22.0</version> + <version>3.0.0-M5</version> <configuration> - <!-- skip itcase e2ecase in mvn package - <includes> - <include>**/*.java</include> - </includes> - --> + <!-- Activate the use of TCP to transmit events to the plugin --> + <forkNode implementation="org.apache.maven.plugin.surefire.extensions.SurefireForkNodeFactory"/> + <!-- skip itcase e2ecase in mvn package + <includes> + <include>**/*.java</include> + </includes> + --> </configuration> </plugin> <plugin> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java index a1085b53..f92935fa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java @@ -20,5 +20,6 @@ package org.apache.doris.flink.catalog.doris; public enum DataModel { DUPLICATE, UNIQUE, + UNIQUE_MOR, AGGREGATE } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 1889ace5..7e1beea2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -17,6 +17,8 @@ package org.apache.doris.flink.cfg; +import org.apache.flink.util.Preconditions; + import java.io.Serializable; import java.util.Objects; @@ -408,6 +410,12 @@ public class DorisReadOptions implements Serializable { * @return a DorisReadOptions with the settings made for this builder. */ public DorisReadOptions build() { + + if (useFlightSql) { + Preconditions.checkNotNull( + flightSqlPort, "flight sql port must be set when use flight sql to read."); + } + return new DorisReadOptions( readFields, filterQuery, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java index 8eef01cf..b744f454 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java @@ -107,6 +107,7 @@ public class Worker implements Runnable { recordList.size(), deduplicateList.size()); StringBuilder sb = new StringBuilder(); + sb.append("/* ApplicationName=Flink Lookup Query */ "); boolean first; for (int i = 0; i < deduplicateList.size(); i++) { if (i > 0) { @@ -169,7 +170,6 @@ public class Worker implements Runnable { private void appendSelect(StringBuilder sb, LookupSchema schema) { String[] selectFields = schema.getSelectFields(); - sb.append("/* ApplicationName=Flink Lookup Query */ "); sb.append(" select "); for (int i = 0; i < selectFields.length; i++) { if (i > 0) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index dee9c1fc..ddaf6600 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -462,6 +462,7 @@ public class RowBatch { case "VARCHAR": case "STRING": case "JSONB": + case "JSON": case "VARIANT": if (!minorType.equals(Types.MinorType.VARCHAR)) { return false; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java index 5c7c151e..5433bbc2 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java @@ -35,7 +35,7 @@ import static org.junit.Assert.assertTrue; public abstract class AbstractContainerTestBase { private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerTestBase.class); - private static ContainerService dorisContainerService; + protected static ContainerService dorisContainerService; public static final int DEFAULT_PARALLELISM = 2; @BeforeClass diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java index 6628933c..e6b8e163 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; public abstract class AbstractITCaseService extends AbstractContainerTestBase { @@ -131,4 +132,17 @@ public abstract class AbstractITCaseService extends AbstractContainerTestBase { } catch (InterruptedException ignored) { } } + + protected JobStatus getFlinkJobStatus(JobClient jobClient) { + JobStatus jobStatus; + try { + jobStatus = jobClient.getJobStatus().get(); + } catch (IllegalStateException e) { + LOG.info("Failed to get state, cause " + e.getMessage()); + jobStatus = JobStatus.FINISHED; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return jobStatus; + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java index f87e498c..6a0ee947 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.StringJoiner; import java.util.stream.Collectors; public class ContainerUtils { @@ -54,6 +55,30 @@ public class ContainerUtils { } } + public static List<String> executeSQLStatement( + Connection connection, Logger logger, String sql, int columnSize) { + List<String> result = new ArrayList<>(); + if (Objects.isNull(sql)) { + return result; + } + try (Statement statement = connection.createStatement()) { + logger.info("start to execute sql={}", sql); + ResultSet resultSet = statement.executeQuery(sql); + + while (resultSet.next()) { + StringJoiner sb = new StringJoiner(","); + for (int i = 1; i <= columnSize; i++) { + Object value = resultSet.getObject(i); + sb.add(String.valueOf(value)); + } + result.add(sb.toString()); + } + return result; + } catch (SQLException e) { + throw new DorisRuntimeException(e); + } + } + public static String loadFileContent(String resourcePath) { try (InputStream stream = ContainerUtils.class.getClassLoader().getResourceAsStream(resourcePath)) { @@ -114,6 +139,23 @@ public class ContainerUtils { String query, int columnSize, boolean ordered) { + List<String> actual = getResult(connection, logger, expected, query, columnSize); + if (ordered) { + Assert.assertArrayEquals(expected.toArray(), actual.toArray()); + } else { + Assert.assertEquals(expected.size(), actual.size()); + Assert.assertArrayEquals( + expected.stream().sorted().toArray(Object[]::new), + actual.stream().sorted().toArray(Object[]::new)); + } + } + + public static List<String> getResult( + Connection connection, + Logger logger, + List<String> expected, + String query, + int columnSize) { List<String> actual = new ArrayList<>(); try (Statement statement = connection.createStatement()) { ResultSet sinkResultSet = statement.executeQuery(query); @@ -141,13 +183,6 @@ public class ContainerUtils { "checking test result. expected={}, actual={}", String.join(",", expected), String.join(",", actual)); - if (ordered) { - Assert.assertArrayEquals(expected.toArray(), actual.toArray()); - } else { - Assert.assertEquals(expected.size(), actual.size()); - Assert.assertArrayEquals( - expected.stream().sorted().toArray(Object[]::new), - actual.stream().sorted().toArray(Object[]::new)); - } + return actual; } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index ce25c677..1b56be22 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -19,29 +19,39 @@ package org.apache.doris.flink.container.e2e; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; import org.apache.doris.flink.container.AbstractContainerTestBase; import org.apache.doris.flink.container.ContainerUtils; import org.apache.doris.flink.table.DorisConfigOptions; -import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; +@RunWith(Parameterized.class) public class Doris2DorisE2ECase extends AbstractContainerTestBase { private static final Logger LOG = LoggerFactory.getLogger(Doris2DorisE2ECase.class); private static final String DATABASE_SOURCE = "test_doris2doris_source"; private static final String DATABASE_SINK = "test_doris2doris_sink"; private static final String TABLE = "test_tbl"; + private final boolean useFlightRead; + + public Doris2DorisE2ECase(boolean useFlightRead) { + this.useFlightRead = useFlightRead; + } + + @Parameterized.Parameters(name = "useFlightRead: {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {false}, new Object[] {true}}; + } + @Test public void testDoris2Doris() throws Exception { LOG.info("Start executing the test case of doris to doris."); @@ -68,10 +78,12 @@ public class Doris2DorisE2ECase extends AbstractContainerTestBase { + "c11 TIMESTAMP, \n" + "c12 char(1), \n" + "c13 varchar(256), \n" - + "c14 Array<String>, \n" - + "c15 Map<String, String>, \n" - + "c16 ROW<name String, age int>, \n" - + "c17 STRING \n" + + "c14 STRING, \n" + + "c15 Array<String>, \n" + + "c16 Map<String, String>, \n" + + "c17 ROW<name String, age int>, \n" + + "c18 STRING, \n" + + "c19 STRING\n" + ") WITH (" + " 'connector' = '" + DorisConfigOptions.IDENTIFIER @@ -82,12 +94,15 @@ public class Doris2DorisE2ECase extends AbstractContainerTestBase { + UUID.randomUUID() + "'," + " 'username' = '%s'," - + " 'password' = '%s'" + + " 'password' = '%s'," + + " 'source.use-flight-sql' = '%s',\n" + + " 'source.flight-sql-port' = '9611'" + ")", getFenodes(), DATABASE_SOURCE + "." + TABLE, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightRead); tEnv.executeSql(sourceDDL); String sinkDDL = @@ -107,10 +122,12 @@ public class Doris2DorisE2ECase extends AbstractContainerTestBase { + "c11 TIMESTAMP, \n" + "c12 char(1), \n" + "c13 varchar(256), \n" - + "c14 Array<String>, \n" - + "c15 Map<String, String>, \n" - + "c16 ROW<name String, age int>, \n" - + "c17 STRING \n" + + "c14 STRING, \n" + + "c15 Array<String>, \n" + + "c16 Map<String, String>, \n" + + "c17 ROW<name String, age int>, \n" + + "c18 STRING, \n" + + "c19 STRING\n" + ") WITH (" + " 'connector' = '" + DorisConfigOptions.IDENTIFIER @@ -131,21 +148,15 @@ public class Doris2DorisE2ECase extends AbstractContainerTestBase { tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM doris_source").await(); - TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink"); - List<Object> actual = new ArrayList<>(); - try (CloseableIterator<Row> iterator = tableResult.collect()) { - while (iterator.hasNext()) { - actual.add(iterator.next().toString()); - } - } - LOG.info("The actual data in the doris sink table is, actual={}", actual); + List<String> excepted = + Arrays.asList( + "1,true,127,32767,2147483647,9223372036854775807,170141183460469231731687303715884105727,3.14,2.71828,12345.6789,2025-03-11,2025-03-11T12:34:56,A,Hello, Doris!,This is a string,[\"Alice\", \"Bob\"],{\"key1\":\"value1\", \"key2\":\"value2\"},{\"name\": \"Tom\", \"age\": 30},{\"key\":\"value\"},{\"data\":123,\"type\":\"variant\"}", + "2,false,-128,-32768,-2147483648,-9223372036854775808,-170141183460469231731687303715884105728,-1.23,1.0E-4,-9999.9999,2024-12-25,2024-12-25T23:59:59,B,Doris Test,Another string!,[\"Charlie\", \"David\"],{\"k1\":\"v1\", \"k2\":\"v2\"},{\"name\": \"Jerry\", \"age\": 25},{\"status\":\"ok\"},{\"data\":[1,2,3]}", + "3,true,0,0,0,0,0,0.0,0.0,0.0000,2023-06-15,2023-06-15T08:00,C,Test Doris,Sample text,[\"Eve\", \"Frank\"],{\"alpha\":\"beta\"},{\"name\": \"Alice\", \"age\": 40},{\"nested\":{\"key\":\"value\"}},{\"variant\":\"test\"}", + "4,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null"); - String[] expected = - new String[] { - "+I[1, true, 127, 32767, 2147483647, 9223372036854775807, 123456789012345678901234567890, 3.14, 2.7182818284, 12345.6789, 2023-05-22, 2023-05-22T12:34:56, A, Example text, [item1, item2, item3], {key1=value1, key2=value2}, +I[John Doe, 30], {\"key\":\"value\"}]", - "+I[2, false, -128, -32768, -2147483648, -9223372036854775808, -123456789012345678901234567890, -3.14, -2.7182818284, -12345.6789, 2024-01-01, 2024-01-01T00:00, B, Another example, [item4, item5, item6], {key3=value3, key4=value4}, +I[Jane Doe, 25], {\"another_key\":\"another_value\"}]" - }; - Assert.assertArrayEquals(expected, actual.toArray(new String[0])); + String query = String.format("SELECT * FROM %s.%s", DATABASE_SINK, TABLE); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, excepted, query, 20, false); } private void initializeDorisTable() { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java index 684de5a0..2dcbe210 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java @@ -25,6 +25,10 @@ public interface ContainerService { void startContainer(); + default void restartContainer() { + throw new DorisRuntimeException("Only doris docker container can implemented."); + } + boolean isRunning(); Connection getQueryConnection(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index ef399d0d..1bd483c3 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -25,6 +25,7 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; import java.io.BufferedReader; import java.io.InputStream; @@ -73,14 +74,23 @@ public class DorisContainer implements ContainerService { .withLogConsumer( new Slf4jLogConsumer( DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))) - .withExposedPorts(8030, 9030, 8040, 9060); + // use customer conf + .withCopyFileToContainer( + MountableFile.forClasspathResource("docker/doris/be.conf"), + "/opt/apache-doris/be/conf/be.conf") + .withCopyFileToContainer( + MountableFile.forClasspathResource("docker/doris/fe.conf"), + "/opt/apache-doris/fe/conf/fe.conf") + .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610); container.setPortBindings( Lists.newArrayList( String.format("%s:%s", "8030", "8030"), String.format("%s:%s", "9030", "9030"), String.format("%s:%s", "9060", "9060"), - String.format("%s:%s", "8040", "8040"))); + String.format("%s:%s", "8040", "8040"), + String.format("%s:%s", "9611", "9611"), + String.format("%s:%s", "9610", "9610"))); return container; } @@ -90,6 +100,7 @@ public class DorisContainer implements ContainerService { // singleton doris container dorisContainer.start(); initializeJdbcConnection(); + initializeVariables(); printClusterStatus(); } catch (Exception ex) { LOG.error("Failed to start containers doris", ex); @@ -98,6 +109,15 @@ public class DorisContainer implements ContainerService { LOG.info("Doris container started successfully."); } + @Override + public void restartContainer() { + LOG.info("Restart doris container."); + dorisContainer + .getDockerClient() + .restartContainerCmd(dorisContainer.getContainerId()) + .exec(); + } + @Override public boolean isRunning() { return dorisContainer.isRunning(); @@ -115,6 +135,16 @@ public class DorisContainer implements ContainerService { } } + private void initializeVariables() throws Exception { + try (Connection connection = getQueryConnection(); + Statement statement = connection.createStatement()) { + LOG.info("init doris cluster variables."); + // avoid arrow flight sql reading bug + statement.executeQuery("SET PROPERTY FOR 'root' 'max_user_connections' = '1024';"); + } + LOG.info("Init variables successfully."); + } + @Override public String getJdbcUrl() { return String.format(JDBC_URL, dorisContainer.getHost()); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java index 6d569bcf..8ecb87da 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java @@ -19,6 +19,7 @@ package org.apache.doris.flink.lookup; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; @@ -30,7 +31,10 @@ import org.apache.flink.util.CloseableIterator; import org.apache.doris.flink.container.AbstractITCaseService; import org.apache.doris.flink.container.ContainerUtils; import org.apache.doris.flink.table.DorisConfigOptions; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,17 +42,36 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +@RunWith(Parameterized.class) public class DorisLookupTableITCase extends AbstractITCaseService { private static final Logger LOG = LoggerFactory.getLogger(DorisLookupTableITCase.class); private static final String DATABASE = "test_lookup"; private static final String TABLE_READ_TBL = "tbl_read_tbl"; + private static final String TABLE_DIM_TBL = "tbl_dim_tbl"; + + private StreamExecutionEnvironment env; + + private final boolean async; + + public DorisLookupTableITCase(boolean async) { + this.async = async; + } + + @Parameterized.Parameters(name = "async: {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {false}, new Object[] {true}}; + } + + @Before + public void before() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + } @Test public void testLookupTable() throws Exception { initializeTable(); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(DEFAULT_PARALLELISM); DataStreamSource<Integer> sourceStream = env.fromElements(1, 2, 3, 4); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Schema schema = @@ -76,13 +99,15 @@ public class DorisLookupTableITCase extends AbstractITCaseService { + "'table.identifier' = '%s'," + "'username' = '%s'," + "'password' = '%s'," - + "'lookup.cache.max-rows' = '100'" + + "'lookup.cache.max-rows' = '100'," + + "'lookup.jdbc.async' = '%s'" + ")", getFenodes(), getDorisQueryUrl(), DATABASE + "." + TABLE_READ_TBL, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + async); tEnv.executeSql(lookupDDL); TableResult tableResult = tEnv.executeSql( @@ -138,4 +163,205 @@ public class DorisLookupTableITCase extends AbstractITCaseService { "insert into %s.%s values (3,-106,-14878,1466614815449373200)", DATABASE, DorisLookupTableITCase.TABLE_READ_TBL)); } + + @Test + public void testLookup() throws Exception { + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + initFlinkTable(tEnv); + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .columnByExpression("proctime", "PROCTIME()") + .build(); + + Table table = tEnv.fromDataStream(env.fromElements(1, 2, 3), schema); + tEnv.createTemporaryView("fact_table", table); + + String query = + "select fact_table.f0," + + "doris_lookup.score" + + " from fact_table" + + " left join doris_lookup FOR SYSTEM_TIME AS OF fact_table.proctime on fact_table.f0 = doris_lookup.id"; + TableResult tableResult = tEnv.executeSql(query); + CloseableIterator<Row> collectIter = tableResult.collect(); + List<String> actual = collectSize(collectIter, 3); + String[] expected = new String[] {"+I[1, 100]", "+I[2, 200]", "+I[3, null]"}; + assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual.toArray())); + collectIter.close(); + + // mock data + tEnv.dropTemporaryView("fact_table"); + tEnv.createTemporaryView( + "fact_table", tEnv.fromDataStream(env.fromElements(1, 2, 3, 4), schema)); + + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format( + "insert into %s.%s values (3,300)", + DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL)); + + TableResult tableResult2 = tEnv.executeSql(query); + CloseableIterator<Row> collectIter2 = tableResult2.collect(); + actual = collectSize(collectIter2, 4); + expected = new String[] {"+I[1, 100]", "+I[2, 200]", "+I[3, 300]", "+I[4, null]"}; + assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual.toArray())); + collectIter2.close(); + } + + @Test + public void testLookupCache() throws Exception { + // Asynchronous data may be found before the cache expires, and may be out of order. + if (async) { + return; + } + + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .columnByExpression("proctime", "PROCTIME()") + .build(); + + DataStreamSource<Integer> mockSource = + env.addSource( + new SourceFunction<Integer>() { + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + ctx.collect(1); + ctx.collect(2); + // wait change table value to verify cache + Thread.sleep(10000); + ctx.collect(3); + + // query from cache + ctx.collect(2); + ctx.collect(1); + + // wait cache expire for max-rows + ctx.collect(4); + ctx.collect(5); + ctx.collect(6); + + // query from table get new value + ctx.collect(2); + ctx.collect(1); + } + + @Override + public void cancel() {} + }); + + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + initFlinkTable(tEnv); + Table table = tEnv.fromDataStream(mockSource, schema); + tEnv.createTemporaryView("fact_table", table); + + String query = + "select fact_table.f0," + + "doris_lookup.score" + + " from fact_table" + + " left join doris_lookup FOR SYSTEM_TIME AS OF fact_table.proctime on fact_table.f0 = doris_lookup.id"; + TableResult tableResult = tEnv.executeSql(query); + CloseableIterator<Row> collectIter = tableResult.collect(); + + List<String> actual = new ArrayList<>(); + int index = 0; + while (actual.size() < 10 && collectIter.hasNext()) { + String row = collectIter.next().toString(); + if ("+I[2, 200]".equals(row) && index == 1) { + LOG.info( + "update table values {}.{}", + DATABASE, + DorisLookupTableITCase.TABLE_DIM_TBL); + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format( + "insert into %s.%s values (1,1111),(2,2222)", + DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL)); + } + actual.add(row); + index++; + } + + String[] expected = + new String[] { + "+I[1, 100]", + "+I[2, 200]", + "+I[3, null]", + "+I[2, 200]", + "+I[1, 100]", + "+I[4, null]", + "+I[5, null]", + "+I[6, null]", + "+I[2, 2222]", + "+I[1, 1111]" + }; + assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual.toArray())); + collectIter.close(); + } + + private static List<String> collectSize(CloseableIterator<Row> iterator, int rows) + throws Exception { + List<String> result = new ArrayList<>(); + while (result.size() < rows && iterator.hasNext()) { + result.add(iterator.next().toString()); + } + return result; + } + + private void initFlinkTable(StreamTableEnvironment tEnv) { + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format( + "DROP TABLE IF EXISTS %s.%s", + DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL), + String.format( + "CREATE TABLE %s.%s ( \n" + + "`id` int(11),\n" + + "`score` int(11)\n" + + ") UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 4\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL), + String.format( + "insert into %s.%s values (1,100)", + DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL), + String.format( + "insert into %s.%s values (2,200)", + DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL)); + + String lookupDDL = + String.format( + "CREATE TABLE `doris_lookup`(" + + " `id` INTEGER," + + " `score` INTEGER," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") WITH (" + + "'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + "'fenodes' = '%s'," + + "'jdbc-url' = '%s'," + + "'table.identifier' = '%s'," + + "'username' = '%s'," + + "'password' = '%s'," + + "'lookup.jdbc.async' = '%s'," + + "'lookup.cache.ttl' = '10m'," + + "'lookup.cache.max-rows' = '3'" + + ")", + getFenodes(), + getDorisQueryUrl(), + DATABASE + "." + TABLE_DIM_TBL, + getDorisUsername(), + getDorisPassword(), + async); + + tEnv.executeSql(lookupDDL); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java new file mode 100644 index 00000000..95156d38 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.codec.binary.Base64; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.container.AbstractITCaseService; +import org.apache.doris.flink.container.ContainerUtils; +import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; +import org.apache.doris.flink.utils.MockSource; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** DorisSink ITCase failover case */ +@RunWith(Parameterized.class) +public class DorisSinkFailoverITCase extends AbstractITCaseService { + private static final Logger LOG = LoggerFactory.getLogger(DorisSinkFailoverITCase.class); + static final String DATABASE = "test_failover_sink"; + static final String TABLE_JSON_TBL_RESTART_DORIS = "tbl_json_tbl_restart_doris"; + static final String TABLE_JSON_TBL_LOAD_FAILURE = "tbl_json_tbl_load_failure"; + static final String TABLE_JSON_TBL_CKPT_FAILURE = "tbl_json_tbl_ckpt_failure"; + + private final boolean batchMode; + + public DorisSinkFailoverITCase(boolean batchMode) { + this.batchMode = batchMode; + } + + @Parameterized.Parameters(name = "batchMode: {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {false}, new Object[] {true}}; + } + + /** test doris cluster failover */ + @Test + public void testDorisClusterFailoverSink() throws Exception { + LOG.info("start to test testDorisClusterFailoverSink."); + makeFailoverTest(TABLE_JSON_TBL_RESTART_DORIS, FaultType.RESTART_FAILURE, 40); + } + + /** mock precommit failure */ + @Test + public void testStreamLoadFailoverSink() throws Exception { + LOG.info("start to test testStreamLoadFailoverSink."); + makeFailoverTest(TABLE_JSON_TBL_LOAD_FAILURE, FaultType.STREAM_LOAD_FAILURE, 20); + } + + /** mock checkpoint failure when precommit or streamload successful */ + @Test + public void testCheckpointFailoverSink() throws Exception { + LOG.info("start to test testCheckpointFailoverSink."); + makeFailoverTest(TABLE_JSON_TBL_CKPT_FAILURE, FaultType.CHECKPOINT_FAILURE, 20); + } + + public void makeFailoverTest(String tableName, FaultType faultType, int totalRecords) + throws Exception { + initializeTable(tableName); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + int checkpointInterval = 5000; + env.enableCheckpointing(checkpointInterval); + + Properties properties = new Properties(); + properties.setProperty("column_separator", ","); + properties.setProperty("format", "csv"); + DorisSink.Builder<String> builder = DorisSink.builder(); + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); + executionBuilder + .setLabelPrefix(UUID.randomUUID().toString()) + .enable2PC() + .setBatchMode(batchMode) + .setFlushQueueSize(4) + .setStreamLoadProp(properties); + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder + .setFenodes(getFenodes()) + .setTableIdentifier(DATABASE + "." + tableName) + .setUsername(getDorisUsername()) + .setPassword(getDorisPassword()); + + builder.setDorisReadOptions(DorisReadOptions.builder().build()) + .setDorisExecutionOptions(executionBuilder.build()) + .setSerializer(new SimpleStringSerializer()) + .setDorisOptions(dorisBuilder.build()); + + int triggerCkptError = -1; + if (FaultType.CHECKPOINT_FAILURE.equals(faultType)) { + triggerCkptError = 7; + } + DataStreamSource<String> mockSource = + env.addSource(new MockSource(totalRecords, triggerCkptError)); + mockSource.sinkTo(builder.build()); + JobClient jobClient = env.executeAsync(); + CompletableFuture<JobStatus> jobStatus = jobClient.getJobStatus(); + LOG.info("Job status: {}", jobStatus); + + String query = String.format("select * from %s", DATABASE + "." + tableName); + List<String> result; + int maxRestart = 5; + Random random = new Random(); + while (true) { + result = ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, query, 2); + + if (result.size() >= totalRecords * DEFAULT_PARALLELISM + && getFlinkJobStatus(jobClient).equals(JobStatus.FINISHED)) { + // Batch mode can only achieve at least once + break; + } + + // Wait until write is successful, then trigger error + if (result.size() > 1 && maxRestart-- >= 0) { + // trigger error random + int randomSleepMs = random.nextInt(30); + if (FaultType.STREAM_LOAD_FAILURE.equals(faultType)) { + faultInjectionOpen(); + randomSleepMs = randomSleepMs + 20; + LOG.info("Injecting fault, sleep {}s before recover", randomSleepMs); + Thread.sleep(randomSleepMs * 1000); + faultInjectionClear(); + } else if (FaultType.RESTART_FAILURE.equals(faultType)) { + // docker image restart time is about 60s + randomSleepMs = randomSleepMs + 60; + dorisContainerService.restartContainer(); + LOG.info( + "Restarting doris cluster, sleep {}s before next restart", + randomSleepMs); + Thread.sleep(randomSleepMs * 1000); + } + } else { + // Avoid frequent queries + Thread.sleep(checkpointInterval); + } + } + + // concat expect value + List<String> expected = new ArrayList<>(); + for (int i = 1; i <= totalRecords; i++) { + for (int j = 0; j < DEFAULT_PARALLELISM; j++) { + expected.add(i + "," + j); + } + } + if (!batchMode) { + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2, false); + } else { + List<String> actualResult = + ContainerUtils.getResult(getDorisQueryConnection(), LOG, expected, query, 2); + Assert.assertTrue( + actualResult.size() >= expected.size() && actualResult.containsAll(expected)); + } + } + + public void faultInjectionOpen() throws IOException { + String pointName = "FlushToken.submit_flush_error"; + String apiUrl = + String.format( + "http://%s/api/debug_point/add/%s", + dorisContainerService.getBenodes(), pointName); + HttpPost httpPost = new HttpPost(apiUrl); + httpPost.addHeader( + HttpHeaders.AUTHORIZATION, + auth(dorisContainerService.getUsername(), dorisContainerService.getPassword())); + try (CloseableHttpClient httpClient = HttpClients.custom().build()) { + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + int statusCode = response.getStatusLine().getStatusCode(); + String reason = response.getStatusLine().toString(); + if (statusCode == 200 && response.getEntity() != null) { + LOG.info("Debug point response {}", EntityUtils.toString(response.getEntity())); + } else { + LOG.info("Debug point failed, statusCode: {}, reason: {}", statusCode, reason); + } + } + } + } + + public void faultInjectionClear() throws IOException { + String apiUrl = + String.format( + "http://%s/api/debug_point/clear", dorisContainerService.getBenodes()); + HttpPost httpPost = new HttpPost(apiUrl); + httpPost.addHeader( + HttpHeaders.AUTHORIZATION, + auth(dorisContainerService.getUsername(), dorisContainerService.getPassword())); + try (CloseableHttpClient httpClient = HttpClients.custom().build()) { + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + int statusCode = response.getStatusLine().getStatusCode(); + String reason = response.getStatusLine().toString(); + if (statusCode == 200 && response.getEntity() != null) { + LOG.info("Debug point response {}", EntityUtils.toString(response.getEntity())); + } else { + LOG.info("Debug point failed, statusCode: {}, reason: {}", statusCode, reason); + } + } + } + } + + private String auth(String user, String password) { + final String authInfo = user + ":" + password; + byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encoded); + } + + private void initializeTable(String table) { + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format( + "CREATE TABLE %s.%s ( \n" + + "`id` int,\n" + + "`task_id` int\n" + + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE, table)); + } + + enum FaultType { + RESTART_FAILURE, + STREAM_LOAD_FAILURE, + CHECKPOINT_FAILURE + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index dd74bb1c..0ebc48fc 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -31,6 +31,7 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.StringUtils; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -41,8 +42,11 @@ import org.apache.doris.flink.sink.batch.DorisBatchSink; import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; import org.apache.doris.flink.table.DorisConfigOptions; import org.apache.doris.flink.utils.MockSource; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,12 +58,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.TimeUnit; import static org.apache.flink.api.common.JobStatus.FINISHED; import static org.apache.flink.api.common.JobStatus.RUNNING; /** DorisSink ITCase with csv and arrow format. */ +@RunWith(Parameterized.class) public class DorisSinkITCase extends AbstractITCaseService { private static final Logger LOG = LoggerFactory.getLogger(DorisSinkITCase.class); static final String DATABASE = "test_sink"; @@ -75,6 +79,17 @@ public class DorisSinkITCase extends AbstractITCaseService { static final String TABLE_CSV_JM = "tbl_csv_jm"; static final String TABLE_CSV_TM = "tbl_csv_tm"; + private final boolean batchMode; + + public DorisSinkITCase(boolean batchMode) { + this.batchMode = batchMode; + } + + @Parameterized.Parameters(name = "batchMode: {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {false}, new Object[] {true}}; + } + @Rule public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( @@ -87,13 +102,19 @@ public class DorisSinkITCase extends AbstractITCaseService { @Test public void testSinkCsvFormat() throws Exception { - initializeTable(TABLE_CSV); + initializeTable(TABLE_CSV, DataModel.UNIQUE); Properties properties = new Properties(); properties.setProperty("column_separator", ","); properties.setProperty("line_delimiter", "\n"); properties.setProperty("format", "csv"); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); - executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties); + executionBuilder + .setLabelPrefix(UUID.randomUUID().toString()) + .setStreamLoadProp(properties) + .setDeletable(false) + .setBufferCount(4) + .setBufferSize(5 * 1024 * 1024) + .setBatchMode(batchMode); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder .setFenodes(getFenodes()) @@ -110,7 +131,7 @@ public class DorisSinkITCase extends AbstractITCaseService { @Test public void testSinkJsonFormat() throws Exception { - initializeTable(TABLE_JSON); + initializeTable(TABLE_JSON, DataModel.UNIQUE); Properties properties = new Properties(); properties.setProperty("read_json_by_line", "true"); properties.setProperty("format", "json"); @@ -124,7 +145,12 @@ public class DorisSinkITCase extends AbstractITCaseService { row2.put("age", 2); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); - executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties); + executionBuilder + .setLabelPrefix(UUID.randomUUID().toString()) + .setBatchMode(batchMode) + .setStreamLoadProp(properties) + // uniq need to be false + .setDeletable(false); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder .setFenodes(getFenodes()) @@ -166,7 +192,7 @@ public class DorisSinkITCase extends AbstractITCaseService { @Test public void testTableSinkJsonFormat() throws Exception { - initializeTable(TABLE_JSON_TBL); + initializeTable(TABLE_JSON_TBL, DataModel.DUPLICATE); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); @@ -191,6 +217,7 @@ public class DorisSinkITCase extends AbstractITCaseService { + " 'sink.enable-2pc' = 'true'," + " 'sink.use-cache' = 'true'," + " 'sink.enable-delete' = 'false'," + + " 'sink.enable.batch-mode' = '%s'," + " 'sink.ignore.update-before' = 'true'," + " 'sink.properties.format' = 'json'," + " 'sink.properties.read_json_by_line' = 'true'," @@ -201,7 +228,8 @@ public class DorisSinkITCase extends AbstractITCaseService { getFenodes(), DATABASE + "." + TABLE_JSON_TBL, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + batchMode); tEnv.executeSql(sinkDDL); tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2"); @@ -218,7 +246,7 @@ public class DorisSinkITCase extends AbstractITCaseService { LOG.info("benodes is empty, skip the test."); return; } - initializeTable(TABLE_TBL_AUTO_REDIRECT); + initializeTable(TABLE_TBL_AUTO_REDIRECT, DataModel.AGGREGATE); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); @@ -239,6 +267,7 @@ public class DorisSinkITCase extends AbstractITCaseService { + " 'table.identifier' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + + " 'sink.enable.batch-mode' = '%s'," + " 'sink.label-prefix' = 'doris_sink" + UUID.randomUUID() + "'" @@ -247,7 +276,8 @@ public class DorisSinkITCase extends AbstractITCaseService { getBenodes(), DATABASE + "." + TABLE_TBL_AUTO_REDIRECT, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + batchMode); tEnv.executeSql(sinkDDL); tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2"); @@ -260,7 +290,7 @@ public class DorisSinkITCase extends AbstractITCaseService { @Test public void testTableBatch() throws Exception { - initializeTable(TABLE_CSV_BATCH_TBL); + initializeTable(TABLE_CSV_BATCH_TBL, DataModel.UNIQUE_MOR); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); @@ -285,7 +315,7 @@ public class DorisSinkITCase extends AbstractITCaseService { + " 'sink.properties.column_separator' = '\\x01'," + " 'sink.properties.line_delimiter' = '\\x02'," + " 'sink.ignore.update-before' = 'false'," - + " 'sink.enable.batch-mode' = 'true'," + + " 'sink.enable.batch-mode' = '%s'," + " 'sink.enable-delete' = 'true'," + " 'sink.flush.queue-size' = '2'," + " 'sink.buffer-flush.max-rows' = '10000'," @@ -295,7 +325,8 @@ public class DorisSinkITCase extends AbstractITCaseService { getFenodes(), DATABASE + "." + TABLE_CSV_BATCH_TBL, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + batchMode); tEnv.executeSql(sinkDDL); tEnv.executeSql("INSERT INTO doris_sink_batch SELECT 'doris',1 union all SELECT 'flink',2"); @@ -309,7 +340,7 @@ public class DorisSinkITCase extends AbstractITCaseService { @Test public void testDataStreamBatch() throws Exception { - initializeTable(TABLE_CSV_BATCH_DS); + initializeTable(TABLE_CSV_BATCH_DS, DataModel.AGGREGATE); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.setParallelism(DEFAULT_PARALLELISM); @@ -328,6 +359,8 @@ public class DorisSinkITCase extends AbstractITCaseService { DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder .setLabelPrefix(UUID.randomUUID().toString()) + .setFlushQueueSize(3) + .setBatchMode(batchMode) .setStreamLoadProp(properties) .setBufferFlushMaxBytes(10485760) .setBufferFlushMaxRows(10000) @@ -350,7 +383,7 @@ public class DorisSinkITCase extends AbstractITCaseService { @Test public void testTableGroupCommit() throws Exception { - initializeTable(TABLE_GROUP_COMMIT); + initializeTable(TABLE_GROUP_COMMIT, DataModel.DUPLICATE); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); @@ -376,7 +409,7 @@ public class DorisSinkITCase extends AbstractITCaseService { + " 'sink.properties.line_delimiter' = '\\x02'," + " 'sink.properties.group_commit' = 'sync_mode'," + " 'sink.ignore.update-before' = 'false'," - + " 'sink.enable.batch-mode' = 'true'," + + " 'sink.enable.batch-mode' = '%s'," + " 'sink.enable-delete' = 'true'," + " 'sink.flush.queue-size' = '2'," + " 'sink.buffer-flush.max-rows' = '10000'," @@ -386,7 +419,8 @@ public class DorisSinkITCase extends AbstractITCaseService { getFenodes(), DATABASE + "." + TABLE_GROUP_COMMIT, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + batchMode); tEnv.executeSql(sinkDDL); tEnv.executeSql( "INSERT INTO doris_group_commit_sink SELECT 'doris',1 union all SELECT 'group_commit',2 union all SELECT 'flink',3"); @@ -401,7 +435,7 @@ public class DorisSinkITCase extends AbstractITCaseService { @Test public void testTableGzFormat() throws Exception { - initializeTable(TABLE_GZ_FORMAT); + initializeTable(TABLE_GZ_FORMAT, DataModel.UNIQUE); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); @@ -420,6 +454,8 @@ public class DorisSinkITCase extends AbstractITCaseService { + " 'table.identifier' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + + " 'sink.enable.batch-mode' = '%s'," + + " 'sink.enable-delete' = 'false'," + " 'sink.label-prefix' = '" + UUID.randomUUID() + "'," @@ -430,7 +466,8 @@ public class DorisSinkITCase extends AbstractITCaseService { getFenodes(), DATABASE + "." + TABLE_GZ_FORMAT, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + batchMode); tEnv.executeSql(sinkDDL); tEnv.executeSql( "INSERT INTO doris_gz_format_sink SELECT 'doris',1 union all SELECT 'flink',2"); @@ -445,7 +482,7 @@ public class DorisSinkITCase extends AbstractITCaseService { @Test public void testJobManagerFailoverSink() throws Exception { LOG.info("start to test JobManagerFailoverSink."); - initializeFailoverTable(TABLE_CSV_JM); + initializeFailoverTable(TABLE_CSV_JM, DataModel.DUPLICATE); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.enableCheckpointing(10000); @@ -468,6 +505,7 @@ public class DorisSinkITCase extends AbstractITCaseService { executionBuilder .setLabelPrefix(UUID.randomUUID().toString()) .setStreamLoadProp(properties) + .setBatchMode(batchMode) .setUseCache(true); builder.setDorisReadOptions(readOptionBuilder.build()) @@ -499,13 +537,17 @@ public class DorisSinkITCase extends AbstractITCaseService { Arrays.asList("1,0", "1,1", "2,0", "2,1", "3,0", "3,1", "4,0", "4,1", "5,0", "5,1"); String query = String.format("select id,task_id from %s.%s order by 1,2", DATABASE, TABLE_CSV_JM); - ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); + + List<String> actualResult = + ContainerUtils.getResult(getDorisQueryConnection(), LOG, expected, query, 2); + Assert.assertTrue( + actualResult.size() >= expected.size() && actualResult.containsAll(expected)); } @Test public void testTaskManagerFailoverSink() throws Exception { LOG.info("start to test TaskManagerFailoverSink."); - initializeFailoverTable(TABLE_CSV_TM); + initializeFailoverTable(TABLE_CSV_TM, DataModel.DUPLICATE); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.enableCheckpointing(10000); @@ -525,7 +567,10 @@ public class DorisSinkITCase extends AbstractITCaseService { properties.setProperty("column_separator", ","); properties.setProperty("line_delimiter", "\n"); properties.setProperty("format", "csv"); - executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties); + executionBuilder + .setLabelPrefix(UUID.randomUUID().toString()) + .setBatchMode(batchMode) + .setStreamLoadProp(properties); builder.setDorisReadOptions(readOptionBuilder.build()) .setDorisExecutionOptions(executionBuilder.build()) @@ -556,12 +601,17 @@ public class DorisSinkITCase extends AbstractITCaseService { Arrays.asList("1,0", "1,1", "2,0", "2,1", "3,0", "3,1", "4,0", "4,1", "5,0", "5,1"); String query = String.format("select id,task_id from %s.%s order by 1,2", DATABASE, TABLE_CSV_TM); - ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); + + List<String> actualResult = + ContainerUtils.getResult(getDorisQueryConnection(), LOG, expected, query, 2); + Assert.assertTrue( + actualResult.size() >= expected.size() && actualResult.containsAll(expected)); } @Test public void testTableOverwrite() throws Exception { - initializeTable(TABLE_OVERWRITE); + LOG.info("start to test testTableOverwrite."); + initializeTable(TABLE_OVERWRITE, DataModel.AGGREGATE); // mock data ContainerUtils.executeSQLStatement( getDorisQueryConnection(), @@ -593,6 +643,7 @@ public class DorisSinkITCase extends AbstractITCaseService { + " 'jdbc-url' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + + " 'sink.enable.batch-mode' = '%s'," + " 'sink.label-prefix' = '" + UUID.randomUUID() + "'" @@ -601,18 +652,33 @@ public class DorisSinkITCase extends AbstractITCaseService { DATABASE + "." + TABLE_OVERWRITE, getDorisQueryUrl(), getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + batchMode); tEnv.executeSql(sinkDDL); TableResult tableResult = tEnv.executeSql( "INSERT OVERWRITE doris_overwrite_sink SELECT 'doris',1 union all SELECT 'overwrite',2 union all SELECT 'flink',3"); - tableResult.await(25000, TimeUnit.MILLISECONDS); + JobClient jobClient = tableResult.getJobClient().get(); + waitForJobStatus( + jobClient, + Collections.singletonList(FINISHED), + Deadline.fromNow(Duration.ofSeconds(120))); + List<String> expected = Arrays.asList("doris,1", "flink,3", "overwrite,2"); - ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2, false); } - private void initializeTable(String table) { + private void initializeTable(String table, DataModel dataModel) { + String max = DataModel.AGGREGATE.equals(dataModel) ? "MAX" : ""; + String morProps = + !DataModel.UNIQUE_MOR.equals(dataModel) + ? "" + : ",\"enable_unique_key_merge_on_write\" = \"false\""; + String model = + dataModel.equals(DataModel.UNIQUE_MOR) + ? DataModel.UNIQUE.toString() + : dataModel.toString(); ContainerUtils.executeSQLStatement( getDorisQueryConnection(), LOG, @@ -621,15 +687,30 @@ public class DorisSinkITCase extends AbstractITCaseService { String.format( "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" - + "`age` int\n" - + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" - + "PROPERTIES (\n" + + "`age` int %s\n" + + ") " + + " %s KEY(`name`) " + + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + "PROPERTIES (" + "\"replication_num\" = \"1\"\n" - + ")\n", - DATABASE, table)); + + morProps + + ")", + DATABASE, + table, + max, + model)); } - private void initializeFailoverTable(String table) { + private void initializeFailoverTable(String table, DataModel dataModel) { + String max = DataModel.AGGREGATE.equals(dataModel) ? "MAX" : ""; + String morProps = + !DataModel.UNIQUE_MOR.equals(dataModel) + ? "" + : ",\"enable_unique_key_merge_on_write\" = \"false\""; + String model = + dataModel.equals(DataModel.UNIQUE_MOR) + ? DataModel.UNIQUE.toString() + : dataModel.toString(); ContainerUtils.executeSQLStatement( getDorisQueryConnection(), LOG, @@ -638,11 +719,17 @@ public class DorisSinkITCase extends AbstractITCaseService { String.format( "CREATE TABLE %s.%s ( \n" + "`id` int,\n" - + "`task_id` int\n" - + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n" + + "`task_id` int %s\n" + + ") " + + " %s KEY(`id`) " + + "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\"\n" + + morProps + ")\n", - DATABASE, table)); + DATABASE, + table, + max, + model)); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 3eb96597..1421c953 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -31,7 +31,9 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.cfg.DorisStreamOptions; import org.apache.doris.flink.container.AbstractITCaseService; import org.apache.doris.flink.container.ContainerUtils; @@ -41,6 +43,8 @@ import org.apache.doris.flink.table.DorisConfigOptions; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +57,7 @@ import java.util.List; import java.util.Properties; /** DorisSource ITCase. */ +@RunWith(Parameterized.class) public class DorisSourceITCase extends AbstractITCaseService { private static final Logger LOG = LoggerFactory.getLogger(DorisSourceITCase.class); private static final String DATABASE = "test_source"; @@ -73,6 +78,24 @@ public class DorisSourceITCase extends AbstractITCaseService { private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY = "tbl_read_tbl_push_down_with_filter_query"; + private final boolean useFlightSql; + private final int flightSqlPort; + + public DorisSourceITCase(boolean useFlightSql, int flightSqlPort) { + this.useFlightSql = useFlightSql; + this.flightSqlPort = flightSqlPort; + } + + @Parameterized.Parameters(name = "useFlightSql: {0}, flightSqlPort: {1}") + public static Object[] parameters() { + return new Object[][] { + new Object[] { + false, -1, + }, + new Object[] {true, 9611} + }; + } + @Rule public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( @@ -85,7 +108,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testSource() throws Exception { - initializeTable(TABLE_READ); + initializeTable(TABLE_READ, DataModel.AGGREGATE); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.setParallelism(DEFAULT_PARALLELISM); @@ -99,8 +122,14 @@ public class DorisSourceITCase extends AbstractITCaseService { DorisSource<List<?>> source = DorisSource.<List<?>>builder() .setDorisOptions(dorisBuilder.build()) + .setDorisReadOptions( + DorisReadOptions.builder() + .setFlightSqlPort(flightSqlPort) + .setUseFlightSql(useFlightSql) + .build()) .setDeserializer(new SimpleListDeserializationSchema()) .build(); + List<String> actual = new ArrayList<>(); try (CloseableIterator<List<?>> iterator = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Doris Source") @@ -115,7 +144,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testOldSourceApi() throws Exception { - initializeTable(TABLE_READ_OLD_API); + initializeTable(TABLE_READ_OLD_API, DataModel.UNIQUE); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); Properties properties = new Properties(); @@ -141,7 +170,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testTableSource() throws Exception { - initializeTable(TABLE_READ_TBL); + initializeTable(TABLE_READ_TBL, DataModel.DUPLICATE); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); @@ -159,12 +188,16 @@ public class DorisSourceITCase extends AbstractITCaseService { + " 'fenodes' = '%s'," + " 'table.identifier' = '%s'," + " 'username' = '%s'," - + " 'password' = '%s'" + + " 'password' = '%s'," + + " 'source.use-flight-sql' = '%s'," + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE + "." + TABLE_READ_TBL, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightSql, + flightSqlPort); tEnv.executeSql(sourceDDL); TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source"); @@ -192,7 +225,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testTableSourceOldApi() throws Exception { - initializeTable(TABLE_READ_TBL_OLD_API); + initializeTable(TABLE_READ_TBL_OLD_API, DataModel.AGGREGATE); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -231,7 +264,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testTableSourceAllOptions() throws Exception { - initializeTable(TABLE_READ_TBL_ALL_OPTIONS); + initializeTable(TABLE_READ_TBL_ALL_OPTIONS, DataModel.DUPLICATE); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -258,12 +291,16 @@ public class DorisSourceITCase extends AbstractITCaseService { + " 'doris.batch.size' = '1024'," + " 'doris.exec.mem.limit' = '2048mb'," + " 'doris.deserialize.arrow.async' = 'true'," - + " 'doris.deserialize.queue.size' = '32'" + + " 'doris.deserialize.queue.size' = '32'," + + " 'source.use-flight-sql' = '%s'," + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE + "." + TABLE_READ_TBL_ALL_OPTIONS, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightSql, + flightSqlPort); tEnv.executeSql(sourceDDL); TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source_all_options"); @@ -279,7 +316,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testTableSourceFilterAndProjectionPushDown() throws Exception { - initializeTable(TABLE_READ_TBL_PUSH_DOWN); + initializeTable(TABLE_READ_TBL_PUSH_DOWN, DataModel.UNIQUE_MOR); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -296,12 +333,16 @@ public class DorisSourceITCase extends AbstractITCaseService { + " 'fenodes' = '%s'," + " 'table.identifier' = '%s'," + " 'username' = '%s'," - + " 'password' = '%s'" + + " 'password' = '%s'," + + " 'source.use-flight-sql' = '%s'," + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightSql, + flightSqlPort); tEnv.executeSql(sourceDDL); TableResult tableResult = tEnv.executeSql( @@ -320,7 +361,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testTableSourceTimestampFilterAndProjectionPushDown() throws Exception { - initializeTimestampTable(TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN); + initializeTimestampTable(TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN, DataModel.UNIQUE); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); @@ -341,12 +382,16 @@ public class DorisSourceITCase extends AbstractITCaseService { + " 'fenodes' = '%s'," + " 'table.identifier' = '%s'," + " 'username' = '%s'," - + " 'password' = '%s'" + + " 'password' = '%s'," + + " 'source.use-flight-sql' = '%s'," + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE + "." + TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightSql, + flightSqlPort); tEnv.executeSql(sourceDDL); List<String> actualProjectionResult = @@ -453,7 +498,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testTableSourceFilterWithUnionAll() throws Exception { LOG.info("starting to execute testTableSourceFilterWithUnionAll case."); - initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL); + initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL, DataModel.UNIQUE_MOR); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -470,12 +515,16 @@ public class DorisSourceITCase extends AbstractITCaseService { + " 'fenodes' = '%s'," + " 'table.identifier' = '%s'," + " 'username' = '%s'," - + " 'password' = '%s'" + + " 'password' = '%s'," + + " 'source.use-flight-sql' = '%s'," + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightSql, + flightSqlPort); tEnv.executeSql(sourceDDL); String querySql = " SELECT * FROM doris_source_filter_with_union_all where age = '18'" @@ -508,13 +557,19 @@ public class DorisSourceITCase extends AbstractITCaseService { String.format( "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" - + "`dt` date,\n" - + "`age` int\n" - + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + + "`dt` date %s,\n" + + "`age` int %s\n" + + ") " + + " %s KEY(`name`) " + + "DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\"\n" + ")\n", - DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY), + DATABASE, + TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY, + "", + "", + DataModel.UNIQUE), String.format( "insert into %s.%s values ('doris',date_sub(now(),INTERVAL 7 DAY), 18)", DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY), @@ -543,12 +598,16 @@ public class DorisSourceITCase extends AbstractITCaseService { + " 'table.identifier' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," - + " 'doris.filter.query' = ' (dt = DATE_FORMAT(TIMESTAMPADD(DAY , -7, NOW()), ''yyyy-MM-dd'')) '" + + " 'doris.filter.query' = ' (dt = DATE_FORMAT(TIMESTAMPADD(DAY , -7, NOW()), ''yyyy-MM-dd'')) '," + + " 'source.use-flight-sql' = '%s'," + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightSql, + flightSqlPort); tEnv.executeSql(sourceDDL); String querySql = " SELECT * FROM doris_source_filter_with_filter_query where name = 'doris' and age > 2"; @@ -571,7 +630,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testTableSourceFilterWithUnionAllNotEqualFilter() throws Exception { LOG.info("starting to execute testTableSourceFilterWithUnionAllNotEqualFilter case."); - initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER); + initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER, DataModel.AGGREGATE); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -588,12 +647,16 @@ public class DorisSourceITCase extends AbstractITCaseService { + " 'fenodes' = '%s'," + " 'table.identifier' = '%s'," + " 'username' = '%s'," - + " 'password' = '%s'" + + " 'password' = '%s'," + + " 'source.use-flight-sql' = '%s'," + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightSql, + flightSqlPort); tEnv.executeSql(sourceDDL); String querySql = " SELECT * FROM doris_source_filter_with_union_all where name = 'doris'" @@ -616,7 +679,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testJobManagerFailoverSource() throws Exception { LOG.info("start to test JobManagerFailoverSource."); - initializeTableWithData(TABLE_CSV_JM); + initializeTableWithData(TABLE_CSV_JM, DataModel.DUPLICATE); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.enableCheckpointing(200L); @@ -634,12 +697,16 @@ public class DorisSourceITCase extends AbstractITCaseService { + " 'fenodes' = '%s'," + " 'table.identifier' = '%s'," + " 'username' = '%s'," - + " 'password' = '%s'" + + " 'password' = '%s'," + + " 'source.use-flight-sql' = '%s'," + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE + "." + TABLE_CSV_JM, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightSql, + flightSqlPort); tEnv.executeSql(sourceDDL); TableResult tableResult = tEnv.executeSql("select * from doris_source_jm"); CloseableIterator<Row> iterator = tableResult.collect(); @@ -697,7 +764,7 @@ public class DorisSourceITCase extends AbstractITCaseService { @Test public void testTaskManagerFailoverSource() throws Exception { LOG.info("start to test TaskManagerFailoverSource."); - initializeTableWithData(TABLE_CSV_TM); + initializeTableWithData(TABLE_CSV_TM, DataModel.UNIQUE); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); env.enableCheckpointing(200L); @@ -715,12 +782,16 @@ public class DorisSourceITCase extends AbstractITCaseService { + " 'fenodes' = '%s'," + " 'table.identifier' = '%s'," + " 'username' = '%s'," - + " 'password' = '%s'" + + " 'password' = '%s'," + + " 'source.use-flight-sql' = '%s'," + + " 'source.flight-sql-port' = '%s'" + ")", getFenodes(), DATABASE + "." + TABLE_CSV_TM, getDorisUsername(), - getDorisPassword()); + getDorisPassword(), + useFlightSql, + flightSqlPort); tEnv.executeSql(sourceDDL); TableResult tableResult = tEnv.executeSql("select * from doris_source_tm"); CloseableIterator<Row> iterator = tableResult.collect(); @@ -759,7 +830,15 @@ public class DorisSourceITCase extends AbstractITCaseService { assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual)); } - private void initializeTable(String table) { + private void initializeTable(String table, DataModel dataModel) { + String morProps = + !DataModel.UNIQUE_MOR.equals(dataModel) + ? "" + : ",\"enable_unique_key_merge_on_write\" = \"false\""; + String model = + dataModel.equals(DataModel.UNIQUE_MOR) + ? DataModel.UNIQUE.toString() + : dataModel.toString(); ContainerUtils.executeSQLStatement( getDorisQueryConnection(), LOG, @@ -768,18 +847,25 @@ public class DorisSourceITCase extends AbstractITCaseService { String.format( "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" - + "`age` int\n" - + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + + "`age` int %s\n" + + ") " + + " %s KEY(`name`) " + + " DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\"\n" - + ")\n", - DATABASE, table), + + morProps + + ")", + DATABASE, + table, + DataModel.AGGREGATE.equals(dataModel) ? "MAX" : "", + model), String.format("insert into %s.%s values ('doris',18)", DATABASE, table), String.format("insert into %s.%s values ('flink',10)", DATABASE, table), String.format("insert into %s.%s values ('apache',12)", DATABASE, table)); } - private void initializeTimestampTable(String table) { + private void initializeTimestampTable(String table, DataModel dataModel) { + String max = DataModel.AGGREGATE.equals(dataModel) ? "MAX" : ""; ContainerUtils.executeSQLStatement( getDorisQueryConnection(), LOG, @@ -789,14 +875,16 @@ public class DorisSourceITCase extends AbstractITCaseService { "CREATE TABLE %s.%s ( \n" + "`id` int,\n" + "`name` varchar(256),\n" - + "`age` int,\n" - + "`birthday` datetime,\n" - + "`brilliant_time` datetime(6),\n" - + ") DISTRIBUTED BY HASH(`id`) BUCKETS 3\n" + + "`age` int %s,\n" + + "`birthday` datetime %s,\n" + + "`brilliant_time` datetime(6) %s,\n" + + ") " + + " %s KEY(`id`, `name`) " + + " DISTRIBUTED BY HASH(`id`) BUCKETS 3\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\"\n" + ")\n", - DATABASE, table), + DATABASE, table, max, max, max, dataModel.toString()), String.format( "insert into %s.%s values (1,'Kevin',54,'2023-01-01T00:00:00','2023-01-01T00:00:00.000001')", DATABASE, table), @@ -817,7 +905,7 @@ public class DorisSourceITCase extends AbstractITCaseService { DATABASE, table)); } - private void initializeTableWithData(String table) { + private void initializeTableWithData(String table, DataModel dataModel) { ContainerUtils.executeSQLStatement( getDorisQueryConnection(), LOG, @@ -826,12 +914,17 @@ public class DorisSourceITCase extends AbstractITCaseService { String.format( "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" - + "`age` int\n" - + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + + "`age` int %s\n" + + ") " + + " %s KEY(`name`) " + + "DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\"\n" + ")\n", - DATABASE, table), + DATABASE, + table, + DataModel.AGGREGATE.equals(dataModel) ? "MAX" : "", + dataModel.toString()), String.format( "insert into %s.%s values ('101',1),('102',1),('103',1)", DATABASE, table), String.format( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java index a87e2205..fbbc0863 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java @@ -40,6 +40,7 @@ public class MockSource extends RichParallelSourceFunction<String> private transient ListState<Long> state; private Long id = 0L; private int numEventsTotal; + private int failCheckpointId = -1; private volatile boolean running = true; private volatile long waitNextCheckpoint = 0L; private volatile long lastCheckpointConfirmed = 0L; @@ -48,6 +49,11 @@ public class MockSource extends RichParallelSourceFunction<String> this.numEventsTotal = numEventsTotal; } + public MockSource(int numEventsTotal, int failCheckpointId) { + this.numEventsTotal = numEventsTotal; + this.failCheckpointId = failCheckpointId; + } + @Override public void run(SourceContext<String> ctx) throws Exception { @@ -73,7 +79,11 @@ public class MockSource extends RichParallelSourceFunction<String> @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { state.update(Collections.singletonList(id)); - LOG.info("snapshot state to {}", id); + if (failCheckpointId > 0 && context.getCheckpointId() % failCheckpointId == 0) { + throw new RuntimeException( + "Trigger fail for testing, checkpointId = " + context.getCheckpointId()); + } + LOG.info("snapshot state to {} for checkpoint {}", id, context.getCheckpointId()); } @Override diff --git a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql index 20ffb525..b098f58a 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql +++ b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql @@ -17,10 +17,12 @@ CREATE TABLE test_doris2doris_sink.test_tbl ( `c11` datetime, `c12` char(1), `c13` varchar(256), - `c14` Array<String>, - `c15` Map<String, String>, - `c16` Struct<name: String, age: int>, - `c17` JSON + `c14` string, + `c15` Array<String>, + `c16` Map<String, String>, + `c17` Struct<name: String, age: int>, + `c18` JSON, + `c19` VARIANT ) DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 diff --git a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql index 5e57b50b..568a4684 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql +++ b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql @@ -16,11 +16,13 @@ CREATE TABLE test_doris2doris_source.test_tbl ( `c10` date, `c11` datetime, `c12` char(1), - `c13` varchar(256), - `c14` Array<String>, - `c15` Map<String, String>, - `c16` Struct<name: String, age: int>, - `c17` JSON + `c13` varchar(16), + `c14` string, + `c15` Array<String>, + `c16` Map<String, String>, + `c17` Struct<name: String, age: int>, + `c18` JSON, + `c19` JSON -- doris2.1.0 can not read VARIANT ) DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 @@ -29,45 +31,22 @@ PROPERTIES ( "light_schema_change" = "true" ); -INSERT INTO test_doris2doris_source.test_tbl -VALUES - ( - 1, - TRUE, - 127, - 32767, - 2147483647, - 9223372036854775807, - 123456789012345678901234567890, - 3.14, - 2.7182818284, - 12345.6789, - '2023-05-22', - '2023-05-22 12:34:56', - 'A', - 'Example text', - ['item1', 'item2', 'item3'], - {'key1': 'value1', 'key2': 'value2'}, - STRUCT('John Doe', 30), - '{"key": "value"}' - ), - ( - 2, - FALSE, - -128, - -32768, - -2147483648, - -9223372036854775808, - -123456789012345678901234567890, - -3.14, - -2.7182818284, - -12345.6789, - '2024-01-01', - '2024-01-01 00:00:00', - 'B', - 'Another example', - ['item4', 'item5', 'item6'], - {'key3': 'value3', 'key4': 'value4'}, - STRUCT('Jane Doe', 25), - '{"another_key": "another_value"}' -); \ No newline at end of file +INSERT INTO test_doris2doris_source.test_tbl VALUES + (1, true, 127, 32767, 2147483647, 9223372036854775807, 170141183460469231731687303715884105727, + 3.14, 2.71828, 12345.6789, '2025-03-11', '2025-03-11 12:34:56', 'A', 'Hello, Doris!', 'This is a string', + ['Alice', 'Bob'], {'key1': 'value1', 'key2': 'value2'}, STRUCT('Tom', 30), '{"key": "value"}', '{"type": "variant", "data": 123}'); + +INSERT INTO test_doris2doris_source.test_tbl VALUES + (2, false, -128, -32768, -2147483648, -9223372036854775808, -170141183460469231731687303715884105728, + -1.23, 0.0001, -9999.9999, '2024-12-25', '2024-12-25 23:59:59', 'B', 'Doris Test', 'Another string!', + ['Charlie', 'David'], {'k1': 'v1', 'k2': 'v2'}, STRUCT('Jerry', 25), '{"status": "ok"}', '{"data": [1, 2, 3]}' ); + +INSERT INTO test_doris2doris_source.test_tbl VALUES + (3, true, 0, 0, 0, 0, 0, + 0.0, 0.0, 0.0000, '2023-06-15', '2023-06-15 08:00:00', 'C', 'Test Doris', 'Sample text', + ['Eve', 'Frank'], {'alpha': 'beta'}, STRUCT('Alice', 40), '{"nested": {"key": "value"}}', '{"variant": "test"}'); + +INSERT INTO test_doris2doris_source.test_tbl VALUES + (4, NULL, NULL, NULL, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, NULL); \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt index d88b2088..90a0eddc 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt @@ -2,6 +2,7 @@ mysql-sync-database --database test_e2e_mysql --mysql-conf database-name=test_e2e_mysql --including-tables "tbl.*" + --sink-conf sink.ignore.update-before=false --table-conf replication_num=1 --single-sink true --ignore-default-value false \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/docker/doris/be.conf b/flink-doris-connector/src/test/resources/docker/doris/be.conf new file mode 100644 index 00000000..94b76e09 --- /dev/null +++ b/flink-doris-connector/src/test/resources/docker/doris/be.conf @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CUR_DATE=`date +%Y%m%d-%H%M%S` + +PPROF_TMPDIR="$DORIS_HOME/log/" + +JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" + +# For jdk 9+, this JAVA_OPTS will be used as default JVM options +JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" + +# For jdk 17+, this JAVA_OPTS will be used as default JVM options +JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED" + +# since 1.2, the JAVA_HOME need to be set to run BE process. +# JAVA_HOME=/path/to/jdk/ + +# https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile +# https://jemalloc.net/jemalloc.3.html +JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:15000,dirty_decay_ms:15000,oversize_threshold:0,prof:false,lg_prof_interval:32,lg_prof_sample:19,prof_gdump:false,prof_accum:false,prof_leak:false,prof_final:false" +JEMALLOC_PROF_PRFIX="" + +# INFO, WARNING, ERROR, FATAL +sys_log_level = INFO + +# ports for admin, web, heartbeat service +be_port = 9060 +webserver_port = 8040 +heartbeat_service_port = 9050 +brpc_port = 8060 +arrow_flight_sql_port = 9610 +enable_debug_points = true + +# HTTPS configures +enable_https = false +# path of certificate in PEM format. +ssl_certificate_path = "$DORIS_HOME/conf/cert.pem" +# path of private key in PEM format. +ssl_private_key_path = "$DORIS_HOME/conf/key.pem" + + +# Choose one if there are more than one ip except loopback address. +# Note that there should at most one ip match this list. +# If no ip match this rule, will choose one randomly. +# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1 +# Default value is empty. +# priority_networks = 10.10.10.0/24;192.168.0.0/16 + +# data root path, separate by ';' +# You can specify the storage type for each root path, HDD (cold data) or SSD (hot data) +# eg: +# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris +# storage_root_path = /home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD +# /home/disk2/doris,medium:HDD(default) +# +# you also can specify the properties by setting '<property>:<value>', separate by ',' +# property 'medium' has a higher priority than the extension of path +# +# Default value is ${DORIS_HOME}/storage, you should create it by hand. +# storage_root_path = ${DORIS_HOME}/storage + +# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers +# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers + +# Advanced configurations +# sys_log_dir = ${DORIS_HOME}/log +# sys_log_roll_mode = SIZE-MB-1024 +# sys_log_roll_num = 10 +# sys_log_verbose_modules = * +# log_buffer_level = -1 +# palo_cgroups + +# aws sdk log level +# Off = 0, +# Fatal = 1, +# Error = 2, +# Warn = 3, +# Info = 4, +# Debug = 5, +# Trace = 6 +# Default to turn off aws sdk log, because aws sdk errors that need to be cared will be output through Doris logs +aws_log_level=0 +## If you are not running in aws cloud, you can disable EC2 metadata +AWS_EC2_METADATA_DISABLED=true \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/docker/doris/fe.conf b/flink-doris-connector/src/test/resources/docker/doris/fe.conf new file mode 100644 index 00000000..a45fb535 --- /dev/null +++ b/flink-doris-connector/src/test/resources/docker/doris/fe.conf @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +##################################################################### +## The uppercase properties are read and exported by bin/start_fe.sh. +## To see all Frontend configurations, +## see fe/src/org/apache/doris/common/Config.java +##################################################################### + +CUR_DATE=`date +%Y%m%d-%H%M%S` + +# Log dir +LOG_DIR = ${DORIS_HOME}/log + +# For jdk 17, this JAVA_OPTS will be used as default JVM options +JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR -Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED" + +# Set your own JAVA_HOME +# JAVA_HOME=/path/to/jdk/ + +## +## the lowercase properties are read by main program. +## + +# store metadata, must be created before start FE. +# Default value is ${DORIS_HOME}/doris-meta +# meta_dir = ${DORIS_HOME}/doris-meta + +# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers +# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers + +http_port = 8030 +rpc_port = 9020 +query_port = 9030 +edit_log_port = 9010 +arrow_flight_sql_port = 9611 +enable_debug_points = true +arrow_flight_token_cache_size = 50 +# Choose one if there are more than one ip except loopback address. +# Note that there should at most one ip match this list. +# If no ip match this rule, will choose one randomly. +# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1 +# Default value is empty. +# priority_networks = 10.10.10.0/24;192.168.0.0/16 + +# Advanced configurations +# log_roll_size_mb = 1024 +# INFO, WARN, ERROR, FATAL +sys_log_level = INFO +# NORMAL, BRIEF, ASYNC +sys_log_mode = ASYNC +# sys_log_roll_num = 10 +# sys_log_verbose_modules = org.apache.doris +# audit_log_dir = $LOG_DIR +# audit_log_modules = slow_query, query +# audit_log_roll_num = 10 +# meta_delay_toleration_second = 10 +# qe_max_connection = 1024 +# qe_query_timeout_second = 300 +# qe_slow_log_ms = 5000 \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org