This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 2b3dbc8 [Test] add itcases for DorisSource and Dorissink (#270) 2b3dbc8 is described below commit 2b3dbc846abeae4807458868d46a9e19ccd79275 Author: wudi <676366...@qq.com> AuthorDate: Thu Dec 14 15:30:05 2023 +0800 [Test] add itcases for DorisSource and Dorissink (#270) Add integration tests and add them to the ci --- .github/workflows/run-itcase.yml | 44 +++++ flink-doris-connector/pom.xml | 67 ++++--- .../java/org/apache/doris/flink/DorisTestBase.java | 134 ++++++++++++++ .../apache/doris/flink/sink/DorisSinkITCase.java | 203 +++++++++++++++++++++ .../doris/flink/source/DorisSourceITCase.java | 133 ++++++++++++++ 5 files changed, 553 insertions(+), 28 deletions(-) diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml new file mode 100644 index 0000000..624ccaa --- /dev/null +++ b/.github/workflows/run-itcase.yml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: Run ITCases +on: + pull_request: + push: + +jobs: + build-extension: + name: "Run ITCases" + runs-on: ubuntu-latest + defaults: + run: + shell: bash + steps: + - name: Checkout + uses: actions/checkout@master + + - name: Setup java + uses: actions/setup-java@v2 + with: + distribution: adopt + java-version: '8' + + - name: Run ITCases + run: | + cd flink-doris-connector && mvn test -Dtest="*ITCase" + diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 54dc7ba..5057099 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -220,34 +220,6 @@ under the License. <version>31.1-jre</version> </dependency> - <!--Test--> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-core</artifactId> - <version>1.3</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <version>4.2.0</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-inline</artifactId> - <version>4.2.0</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.11</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> @@ -309,6 +281,45 @@ under the License. <version>${flink.version}</version> <scope>provided</scope> </dependency> + <!--Test--> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>4.2.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <version>4.2.0</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>5.10.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>1.17.6</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java new file mode 100644 index 0000000..a0b7601 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink; + +import com.google.common.collect.Lists; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.given; + +public abstract class DorisTestBase { + protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class); +// protected static final String DORIS_12_DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_arm"; + protected static final String DORIS_12_DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86"; + private static final String DRIVER_JAR = + "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; + private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + private static final String URL = "jdbc:mysql://%s:9030"; + protected static final String USERNAME = "root"; + protected static final String PASSWORD = ""; + protected static final GenericContainer DORIS_CONTAINER = createDorisContainer(); + protected static Connection connection; + protected static final int DEFAULT_PARALLELISM = 4; + + protected static String getFenodes(){ + return DORIS_CONTAINER.getHost() + ":8030"; + } + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(DORIS_CONTAINER)).join(); + given().ignoreExceptions() + .await() + .atMost(120, TimeUnit.SECONDS) + .untilAsserted(DorisTestBase::initializeJdbcConnection); + LOG.info("Containers are started."); + } + + @AfterClass + public static void stopContainers() { + LOG.info("Stopping containers..."); + DORIS_CONTAINER.stop(); + LOG.info("Containers are stopped."); + } + + public static GenericContainer createDorisContainer(){ + GenericContainer container = new GenericContainer<>(DORIS_12_DOCKER_IMAGE) + .withNetwork(Network.newNetwork()) + .withNetworkAliases("DorisContainer") + .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010") + .withEnv("FE_ID", "1") + .withEnv("CURRENT_BE_IP", "127.0.0.1") + .withEnv("CURRENT_BE_PORT", "9050") + .withCommand("ulimit -n 65536") + .withCreateContainerCmdModifier( + cmd -> cmd.getHostConfig().withMemorySwap(0L)) + .withPrivilegedMode(true) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DORIS_12_DOCKER_IMAGE))); + + container.setPortBindings( + Lists.newArrayList( + String.format("%s:%s", "8030", "8030"), + String.format("%s:%s", "9030", "9030"), + String.format("%s:%s", "9060", "9060"), + String.format("%s:%s", "8040", "8040"))); + + return container; + } + + protected static void initializeJdbcConnection() + throws SQLException, MalformedURLException { + URLClassLoader urlClassLoader = + new URLClassLoader( + new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader()); + LOG.info("Try to connect to Doris..."); + Thread.currentThread().setContextClassLoader(urlClassLoader); + connection = DriverManager.getConnection(String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + try (Statement statement = connection.createStatement()) { + ResultSet resultSet; + do { + LOG.info("Wait for the Backend to start successfully..."); + resultSet = statement.executeQuery("show backends"); + } while (!isBeReady(resultSet, Duration.ofSeconds(1L))); + } + LOG.info("Connected to Doris successfully..."); + } + + private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException { + if (rs.next()) { + String isAlive = rs.getString(10).trim(); + String totalCap = rs.getString(16).trim(); + LockSupport.parkNanos(duration.toNanos()); + return "true".equalsIgnoreCase(isAlive) && !"0.000".equalsIgnoreCase(totalCap); + } + return false; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java new file mode 100644 index 0000000..a0be145 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.DorisTestBase; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * DorisSink ITCase with csv and arrow format + */ +public class DorisSinkITCase extends DorisTestBase { + static final String DATABASE = "test"; + static final String TABLE_CSV = "tbl_csv"; + static final String TABLE_JSON = "tbl_json"; + static final String TABLE_JSON_TBL = "tbl_json_tbl"; + + @Test + public void testSinkCsvFormat() throws Exception { + initializeTable(TABLE_CSV); + Properties properties = new Properties(); + properties.setProperty("column_separator", ","); + properties.setProperty("line_delimiter", "\n"); + properties.setProperty("format", "csv"); + submitJob(TABLE_CSV, properties, new String[]{"doris,1"}); + + Thread.sleep(10000); + Set<List<Object>> actual = new HashSet<>(); + try (Statement sinkStatement = connection.createStatement()) { + ResultSet sinkResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_CSV)); + while (sinkResultSet.next()) { + List<Object> row = + Arrays.asList( + sinkResultSet.getString("name"), + sinkResultSet.getInt("age")); + actual.add(row); + } + } + Set<List<Object>> expected = + Stream.<List<Object>>of(Arrays.asList("doris", 1)) + .collect(Collectors.toSet()); + Assertions.assertIterableEquals(expected, actual); + } + + @Test + public void testSinkJsonFormat() throws Exception { + initializeTable(TABLE_JSON); + Properties properties = new Properties(); + properties.setProperty("read_json_by_line", "true"); + properties.setProperty("format", "json"); + + //mock data + Map<String, Object> row1 = new HashMap<>(); + row1.put("name", "doris1"); + row1.put("age", 1); + Map<String, Object> row2 = new HashMap<>(); + row2.put("name", "doris2"); + row2.put("age", 2); + + submitJob(TABLE_JSON, properties, new String[]{new ObjectMapper().writeValueAsString(row1), new ObjectMapper().writeValueAsString(row2)}); + + Thread.sleep(10000); + Set<List<Object>> actual = new HashSet<>(); + try (Statement sinkStatement = connection.createStatement()) { + ResultSet sinkResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON)); + while (sinkResultSet.next()) { + List<Object> row = + Arrays.asList( + sinkResultSet.getString("name"), + sinkResultSet.getInt("age")); + actual.add(row); + } + } + Set<List<Object>> expected = + Stream.<List<Object>>of(Arrays.asList("doris1", 1),Arrays.asList("doris2", 2)) + .collect(Collectors.toSet()); + Assertions.assertIterableEquals(expected, actual); + } + + public void submitJob(String table, Properties properties, String[] records) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + DorisSink.Builder<String> builder = DorisSink.builder(); + final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); + + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder.setFenodes(getFenodes()) + .setTableIdentifier(DATABASE + "." + table) + .setUsername(USERNAME) + .setPassword(PASSWORD); + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); + executionBuilder.setLabelPrefix(UUID.randomUUID().toString()) + .setStreamLoadProp(properties); + + builder.setDorisReadOptions(readOptionBuilder.build()) + .setDorisExecutionOptions(executionBuilder.build()) + .setSerializer(new SimpleStringSerializer()) + .setDorisOptions(dorisBuilder.build()); + + env.fromElements(records).sinkTo(builder.build()); + env.execute(); + } + + + @Test + public void testTableSinkJsonFormat() throws Exception { + initializeTable(TABLE_JSON_TBL); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sinkDDL = + String.format( + "CREATE TABLE doris_sink (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.properties.format' = 'json'," + + " 'sink.properties.read_json_by_line' = 'true'," + + " 'sink.label-prefix' = 'doris_sink'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_JSON_TBL, + USERNAME, + PASSWORD); + tEnv.executeSql(sinkDDL); + tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2"); + + Thread.sleep(10000); + Set<List<Object>> actual = new HashSet<>(); + try (Statement sinkStatement = connection.createStatement()) { + ResultSet sinkResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL)); + while (sinkResultSet.next()) { + List<Object> row = + Arrays.asList( + sinkResultSet.getString("name"), + sinkResultSet.getInt("age")); + actual.add(row); + } + } + Set<List<Object>> expected = + Stream.<List<Object>>of(Arrays.asList("doris", 1),Arrays.asList("flink", 2)) + .collect(Collectors.toSet()); + Assertions.assertIterableEquals(expected, actual); + } + + + private void initializeTable(String table) throws Exception { + try(Statement statement = connection.createStatement()){ + statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); + statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table)); + statement.execute(String.format("CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", DATABASE, table)); + } + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java new file mode 100644 index 0000000..83e959e --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.source; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; +import org.apache.doris.flink.DorisTestBase; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * DorisSource ITCase + */ +public class DorisSourceITCase extends DorisTestBase { + static final String DATABASE = "test"; + static final String TABLE_READ = "tbl_read"; + static final String TABLE_READ_TBL = "tbl_read_tbl"; + + @Test + public void testSource() throws Exception { + initializeTable(TABLE_READ); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); + + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder.setFenodes(getFenodes()) + .setTableIdentifier(DATABASE + "." + TABLE_READ) + .setUsername(USERNAME) + .setPassword(PASSWORD); + + DorisSource<List<?>> source = DorisSource.<List<?>>builder() + .setDorisReadOptions(readOptionBuilder.build()) + .setDorisOptions(dorisBuilder.build()) + .setDeserializer(new SimpleListDeserializationSchema()) + .build(); + List<Object> actual = new ArrayList<>(); + try(CloseableIterator<List<?>> iterator = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "Doris Source") + .executeAndCollect()){ + while (iterator.hasNext()) { + actual.add(iterator.next()); + } + } + List<Object> expected = Arrays.asList(Arrays.asList("doris", 18), Arrays.asList("flink", 10)); + Assertions.assertIterableEquals(expected, actual);; + } + + @Test + public void testTableSource() throws Exception { + initializeTable(TABLE_READ_TBL); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sourceDDL = + String.format( + "CREATE TABLE doris_source (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_READ_TBL, + USERNAME, + PASSWORD); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source"); + + List<Object> actual = new ArrayList<>(); + try(CloseableIterator<Row> iterator = tableResult.collect()){ + while (iterator.hasNext()) { + actual.add(iterator.next().toString()); + } + } + String[] expected = + new String[] { + "+I[doris, 18]", + "+I[flink, 10]" + }; + Assertions.assertIterableEquals(Arrays.asList(expected), actual); + } + + private void initializeTable(String table) throws Exception { + try(Statement statement = connection.createStatement()){ + statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); + statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table)); + statement.execute(String.format("CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", DATABASE, table)); + statement.execute(String.format("insert into %s.%s values ('doris',18)", DATABASE, table)); + statement.execute(String.format("insert into %s.%s values ('flink',10)", DATABASE, table)); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org