This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new cdbeabd [Improve](case) add doris source it case (#296) cdbeabd is described below commit cdbeabd1f12d0b0577d32fe64c57b735669f4a94 Author: wudi <676366...@qq.com> AuthorDate: Wed Mar 26 16:30:24 2025 +0800 [Improve](case) add doris source it case (#296) --- .github/workflows/run-itcase-12.yml | 51 --- .../{run-itcase-20.yml => run-itcase.yml} | 10 +- .licenserc.yaml | 28 +- .../spark/client/read/AbstractThriftReader.java | 3 +- .../spark/client/read/DorisFlightSqlReader.java | 27 +- .../client/read/ReaderPartitionGenerator.java | 8 +- .../apache/doris/spark/client/read/RowBatch.java | 16 +- .../spark/exception/DorisRuntimeException.java | 45 +++ .../apache/doris/spark/rest/models/DataModel.java | 25 ++ .../apache/doris/spark/util/SchemaConvertors.scala | 2 +- .../doris/spark/client/read/RowBatchTest.java | 15 +- .../java/org/apache/doris/spark/DorisTestBase.java | 137 -------- .../spark/container/AbstractContainerTestBase.java | 118 +++++++ .../doris/spark/container/ContainerUtils.java | 188 ++++++++++ .../spark/container/instance/ContainerService.java | 52 +++ .../spark/container/instance/DorisContainer.java | 276 +++++++++++++++ .../container/instance/DorisCustomerContainer.java | 138 ++++++++ .../apache/doris/spark/sql/DorisReaderITCase.scala | 386 +++++++++++++++++---- .../apache/doris/spark/sql/DorisWriterITCase.scala | 100 +++--- .../test/resources/container/ddl/read_all_type.sql | 50 +++ .../test/resources/container/ddl/read_bitmap.sql | 17 + .../src/test/resources/docker/doris/be.conf | 99 ++++++ .../src/test/resources/docker/doris/fe.conf | 74 ++++ .../src/test/resources/log4j.properties | 23 ++ .../read/expression/V2ExpressionBuilder.scala | 15 +- 25 files changed, 1540 insertions(+), 363 deletions(-) diff --git a/.github/workflows/run-itcase-12.yml b/.github/workflows/run-itcase-12.yml deleted file mode 100644 index fd28357..0000000 --- a/.github/workflows/run-itcase-12.yml +++ /dev/null @@ -1,51 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# ---- -name: Run ITCases 1.2 -on: - pull_request: - push: - -jobs: - build-extension: - name: "Run ITCases 1.2" - runs-on: ubuntu-latest - defaults: - run: - shell: bash - steps: - - name: Checkout - uses: actions/checkout@master - - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: '8' - - - name: Run ITCases for spark 2 - run: | - cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86" - - - name: Run ITCases for spark 3.1 - run: | - cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86" - - - name: Run ITCases for spark 3.3 - run: | - cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86" diff --git a/.github/workflows/run-itcase-20.yml b/.github/workflows/run-itcase.yml similarity index 87% rename from .github/workflows/run-itcase-20.yml rename to .github/workflows/run-itcase.yml index b0f31c0..10be11e 100644 --- a/.github/workflows/run-itcase-20.yml +++ b/.github/workflows/run-itcase.yml @@ -16,14 +16,14 @@ # under the License. # --- -name: Run ITCases 2.0 +name: Run ITCases on: pull_request: push: jobs: build-extension: - name: "Run ITCases 2.0" + name: "Run ITCases" runs-on: ubuntu-latest defaults: run: @@ -40,13 +40,13 @@ jobs: - name: Run ITCases for spark 2 run: | - cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3" + cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" - name: Run ITCases for spark 3.1 run: | - cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3" + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" - name: Run ITCases for spark 3.3 run: | - cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3" + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" \ No newline at end of file diff --git a/.licenserc.yaml b/.licenserc.yaml index e5af614..4feebdc 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -5,33 +5,15 @@ header: paths-ignore: - 'dist' - - 'licenses' - - '**/*.md' - - 'LICENSE' + - 'LICENSE.txt' + - 'NOTICE.txt' - 'NOTICE' - - 'DISCLAIMER' - - '.clang-format' - - '.clang-format-ignore' - - '.gitattributes' - '.gitignore' - - '.gitmodules' + - '.github/PULL_REQUEST_TEMPLATE.md' - '.licenserc.yaml' - - '.rat-excludes' - - 'be/src/common/status.cpp' - - 'be/src/common/status.h' - - 'be/src/env/env.h' - - 'be/src/env/env_posix.cpp' - - '**/glibc-compatibility/**' - - '**/gutil/**' - - '**/test_data/**' - - '**/jmockit/**' - - '**/*.json' - - '**/*.dat' - - '**/*.svg' - - '**/*.md5' - - '**/*.patch' - - '**/*.log' - 'custom_env.sh.tpl' - '**/*.csv' + - '**/jmockit/**' + - 'spark-doris-connector/spark-doris-connector-it/src/test/resources/container/' comment: on-failure diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java index 7dbb72b..373910c 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java @@ -32,6 +32,7 @@ import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.OptionRequiredException; import org.apache.doris.spark.rest.models.Field; import org.apache.doris.spark.rest.models.Schema; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -259,7 +260,7 @@ public abstract class AbstractThriftReader extends DorisReader { if (readColumn.contains(" AS ")) { int asIdx = readColumn.indexOf(" AS "); String realColumn = readColumn.substring(asIdx + 4).trim().replaceAll("`", ""); - if (fieldTypeMap.containsKey(realColumn) && scanTypeMap.containsKey(realColumn) + if (fieldTypeMap.containsKey(realColumn) && ("BITMAP".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()) || "HLL".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()))) { newFieldList.add(new Field(realColumn, TPrimitiveType.VARCHAR.name(), null, 0, 0, null)); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java index 4623d65..779d622 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java @@ -17,17 +17,6 @@ package org.apache.doris.spark.client.read; -import org.apache.arrow.adbc.core.AdbcConnection; -import org.apache.arrow.adbc.core.AdbcDatabase; -import org.apache.arrow.adbc.core.AdbcDriver; -import org.apache.arrow.adbc.core.AdbcException; -import org.apache.arrow.adbc.core.AdbcStatement; -import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver; -import org.apache.arrow.flight.Location; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.ipc.ArrowReader; -import org.apache.commons.lang3.StringUtils; import org.apache.doris.sdk.thrift.TPrimitiveType; import org.apache.doris.spark.client.DorisFrontendClient; import org.apache.doris.spark.client.entity.DorisReaderPartition; @@ -39,6 +28,18 @@ import org.apache.doris.spark.exception.OptionRequiredException; import org.apache.doris.spark.exception.ShouldNeverHappenException; import org.apache.doris.spark.rest.models.Field; import org.apache.doris.spark.rest.models.Schema; + +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,7 @@ import java.util.stream.Collectors; public class DorisFlightSqlReader extends DorisReader { private static final Logger log = LoggerFactory.getLogger(DorisFlightSqlReader.class); + private static final String PREFIX = "/* ApplicationName=Spark ArrowFlightSQL Query */"; private final AtomicBoolean endOfStream = new AtomicBoolean(false); private final DorisFrontendClient frontendClient; private final Schema schema; @@ -136,6 +138,7 @@ public class DorisFlightSqlReader extends DorisReader { private ArrowReader executeQuery() throws AdbcException, OptionRequiredException { AdbcStatement statement = connection.createStatement(); String flightSql = generateQuerySql(partition); + log.info("Query SQL Sending to Doris FE is: {}", flightSql); statement.setSqlQuery(flightSql); AdbcStatement.QueryResult queryResult = statement.executeQuery(); return queryResult.getReader(); @@ -147,7 +150,7 @@ public class DorisFlightSqlReader extends DorisReader { String tablets = String.format("TABLET(%s)", StringUtils.join(partition.getTablets(), ",")); String predicates = partition.getFilters().length == 0 ? "" : " WHERE " + String.join(" AND ", partition.getFilters()); String limit = partition.getLimit() > 0 ? " LIMIT " + partition.getLimit() : ""; - return String.format("SELECT %s FROM %s %s%s%s", columns, fullTableName, tablets, predicates, limit); + return PREFIX + String.format("SELECT %s FROM %s %s%s%s", columns, fullTableName, tablets, predicates, limit); } protected Schema processDorisSchema(DorisReaderPartition partition) throws Exception { diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java index 002b58b..bc6c410 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java @@ -17,7 +17,6 @@ package org.apache.doris.spark.client.read; -import com.google.common.annotations.VisibleForTesting; import org.apache.doris.spark.client.DorisFrontendClient; import org.apache.doris.spark.client.entity.Backend; import org.apache.doris.spark.client.entity.DorisReaderPartition; @@ -27,6 +26,8 @@ import org.apache.doris.spark.rest.models.Field; import org.apache.doris.spark.rest.models.QueryPlan; import org.apache.doris.spark.rest.models.Schema; import org.apache.doris.spark.util.DorisDialects; + +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public class ReaderPartitionGenerator { originReadCols = new String[0]; } String[] filters = config.contains(DorisOptions.DORIS_FILTER_QUERY) ? - config.getValue(DorisOptions.DORIS_FILTER_QUERY).split("\\.") : new String[0]; + new String[]{config.getValue(DorisOptions.DORIS_FILTER_QUERY)} : new String[0]; return generatePartitions(config, originReadCols, filters, -1, datetimeJava8ApiEnabled); } @@ -131,7 +132,8 @@ public class ReaderPartitionGenerator { protected static String[] getFinalReadColumns(DorisConfig config, DorisFrontendClient frontendClient, String db, String table, String[] readFields) throws Exception { Schema tableSchema = frontendClient.getTableSchema(db, table); - Map<String, String> fieldTypeMap = tableSchema.getProperties().stream().collect(Collectors.toMap(Field::getName, Field::getType)); + Map<String, String> fieldTypeMap = tableSchema.getProperties().stream().collect( + Collectors.toMap(Field::getName, Field::getType)); Boolean bitmapToString = config.getValue(DorisOptions.DORIS_READ_BITMAP_TO_STRING); Boolean bitmapToBase64 = config.getValue(DorisOptions.DORIS_READ_BITMAP_TO_BASE64); return Arrays.stream(readFields).filter(fieldTypeMap::containsKey).map(readField -> { diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java index bf576f1..d937edd 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java @@ -17,6 +17,11 @@ package org.apache.doris.spark.client.read; +import org.apache.doris.sdk.thrift.TScanBatchResult; +import org.apache.doris.spark.exception.DorisException; +import org.apache.doris.spark.rest.models.Schema; +import org.apache.doris.spark.util.IPUtils; + import com.google.common.base.Preconditions; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BaseIntVector; @@ -44,10 +49,6 @@ import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.commons.lang3.ArrayUtils; -import org.apache.doris.sdk.thrift.TScanBatchResult; -import org.apache.doris.spark.exception.DorisException; -import org.apache.doris.spark.rest.models.Schema; -import org.apache.doris.spark.util.IPUtils; import org.apache.spark.sql.types.Decimal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +65,6 @@ import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; -import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; @@ -74,7 +74,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; -import java.util.TimeZone; /** * row batch data container. @@ -266,7 +265,7 @@ public class RowBatch implements Serializable { byte[] bytes = largeIntVector.get(rowIndex); ArrayUtils.reverse(bytes); BigInteger largeInt = new BigInteger(bytes); - addValueToRow(rowIndex, Decimal.apply(largeInt)); + addValueToRow(rowIndex, largeInt.toString()); } } else { VarCharVector largeIntVector = (VarCharVector) curFieldVector; @@ -276,8 +275,7 @@ public class RowBatch implements Serializable { continue; } String stringValue = new String(largeIntVector.get(rowIndex)); - BigInteger largeInt = new BigInteger(stringValue); - addValueToRow(rowIndex, Decimal.apply(largeInt)); + addValueToRow(rowIndex, stringValue); } } break; diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/exception/DorisRuntimeException.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/exception/DorisRuntimeException.java new file mode 100644 index 0000000..848ac5f --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/exception/DorisRuntimeException.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.exception; + +/** Doris runtime exception. */ +public class DorisRuntimeException extends RuntimeException { + public DorisRuntimeException() { + super(); + } + + public DorisRuntimeException(String message) { + super(message); + } + + public DorisRuntimeException(String message, Throwable cause) { + super(message, cause); + } + + public DorisRuntimeException(Throwable cause) { + super(cause); + } + + protected DorisRuntimeException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataModel.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataModel.java new file mode 100644 index 0000000..a13181e --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataModel.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.rest.models; + +public enum DataModel { + DUPLICATE, + UNIQUE, + UNIQUE_MOR, + AGGREGATE +} diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala index e694083..91b8317 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala @@ -42,7 +42,7 @@ object SchemaConvertors { case "BINARY" => DataTypes.BinaryType case "DECIMAL" => DecimalType(precision, scale) case "CHAR" => DataTypes.StringType - case "LARGEINT" => DecimalType(38, 0) + case "LARGEINT" => DataTypes.StringType case "VARCHAR" => DataTypes.StringType case "JSON" => DataTypes.StringType case "JSONB" => DataTypes.StringType diff --git a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java index 5a55a95..09f2e07 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java +++ b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java @@ -17,6 +17,12 @@ package org.apache.doris.spark.client.read; +import org.apache.doris.sdk.thrift.TScanBatchResult; +import org.apache.doris.sdk.thrift.TStatus; +import org.apache.doris.sdk.thrift.TStatusCode; +import org.apache.doris.spark.exception.DorisException; +import org.apache.doris.spark.rest.models.Schema; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -51,11 +57,6 @@ import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.commons.lang3.ArrayUtils; -import org.apache.doris.sdk.thrift.TScanBatchResult; -import org.apache.doris.sdk.thrift.TStatus; -import org.apache.doris.sdk.thrift.TStatusCode; -import org.apache.doris.spark.exception.DorisException; -import org.apache.doris.spark.rest.models.Schema; import org.apache.spark.sql.types.Decimal; import static org.hamcrest.core.StringStartsWith.startsWith; import org.junit.Assert; @@ -610,8 +611,8 @@ public class RowBatchTest { Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); - Assert.assertEquals(Decimal.apply(new BigInteger("9223372036854775808")), actualRow0.get(0)); - Assert.assertEquals(Decimal.apply(new BigInteger("9223372036854775809")), actualRow0.get(1)); + Assert.assertEquals("9223372036854775808", actualRow0.get(0)); + Assert.assertEquals("9223372036854775809", actualRow0.get(1)); Assert.assertFalse(rowBatch.hasNext()); thrown.expect(NoSuchElementException.class); diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/DorisTestBase.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/DorisTestBase.java deleted file mode 100644 index f7c6f0b..0000000 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/DorisTestBase.java +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.spark; - -import static org.awaitility.Awaitility.given; -import static org.awaitility.Durations.ONE_SECOND; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerLoggerFactory; - -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.time.Duration; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; -import java.util.stream.Stream; - -public abstract class DorisTestBase { - public static final String PASSWORD = ""; - protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class); - protected static final String DORIS_DOCKER_IMAGE = System.getProperty("image"); - protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; - protected static final String URL = "jdbc:mysql://%s:9030"; - protected static final String USERNAME = "root"; - protected static final GenericContainer DORIS_CONTAINER = createDorisContainer(); - private static final String DRIVER_JAR = - "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; - protected static Connection connection; - - protected static String getFenodes() { - return DORIS_CONTAINER.getHost() + ":8030"; - } - - @BeforeClass - public static void startContainers() { - LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(DORIS_CONTAINER)).join(); - given().ignoreExceptions() - .await() - .atMost(300, TimeUnit.SECONDS) - .pollInterval(ONE_SECOND) - .untilAsserted(DorisTestBase::initializeJdbcConnection); - LOG.info("Containers are started."); - } - - @AfterClass - public static void stopContainers() { - LOG.info("Stopping containers..."); - DORIS_CONTAINER.stop(); - LOG.info("Containers are stopped."); - } - - public static GenericContainer createDorisContainer() { - GenericContainer container = - new GenericContainer<>(DORIS_DOCKER_IMAGE) - .withNetwork(Network.newNetwork()) - .withNetworkAliases("DorisContainer") - .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010") - .withEnv("FE_ID", "1") - .withEnv("CURRENT_BE_IP", "127.0.0.1") - .withEnv("CURRENT_BE_PORT", "9050") - .withCommand("ulimit -n 65536") - .withCreateContainerCmdModifier( - cmd -> cmd.getHostConfig().withMemorySwap(0L)) - .withPrivilegedMode(true) - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))); - - container.setPortBindings( - Arrays.asList( - String.format("%s:%s", "8030", "8030"), - String.format("%s:%s", "9030", "9030"), - String.format("%s:%s", "9060", "9060"), - String.format("%s:%s", "8040", "8040"))); - - return container; - } - - protected static void initializeJdbcConnection() throws SQLException, MalformedURLException { - URLClassLoader urlClassLoader = - new URLClassLoader( - new URL[]{new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader()); - LOG.info("Try to connect to Doris..."); - Thread.currentThread().setContextClassLoader(urlClassLoader); - connection = - DriverManager.getConnection( - String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); - try (Statement statement = connection.createStatement()) { - ResultSet resultSet; - do { - LOG.info("Wait for the Backend to start successfully..."); - resultSet = statement.executeQuery("show backends"); - } while (!isBeReady(resultSet, Duration.ofSeconds(1L))); - } - LOG.info("Connected to Doris successfully..."); - } - - private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException { - LockSupport.parkNanos(duration.toNanos()); - if (rs.next()) { - String isAlive = rs.getString("Alive").trim(); - String totalCap = rs.getString("TotalCapacity").trim(); - return "true".equalsIgnoreCase(isAlive) && !"0.000".equalsIgnoreCase(totalCap); - } - return false; - } - -} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java new file mode 100644 index 0000000..97e7e26 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.container; + +import org.apache.doris.spark.container.instance.ContainerService; +import org.apache.doris.spark.container.instance.DorisContainer; +import org.apache.doris.spark.container.instance.DorisCustomerContainer; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public abstract class AbstractContainerTestBase { + private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerTestBase.class); + protected static ContainerService dorisContainerService; + public static final int DEFAULT_PARALLELISM = 2; + + @BeforeClass + public static void initContainers() { + LOG.info("Trying to start doris containers."); + initDorisContainer(); + } + + private static void initDorisContainer() { + if (Objects.nonNull(dorisContainerService) && dorisContainerService.isRunning()) { + LOG.info("The doris container has been started and is running status."); + return; + } + Boolean customerEnv = Boolean.valueOf(System.getProperty("customer_env", "false")); + dorisContainerService = customerEnv ? new DorisCustomerContainer() : new DorisContainer(); + dorisContainerService.startContainer(); + LOG.info("Doris container was started."); + } + + protected static Connection getDorisQueryConnection() { + return dorisContainerService.getQueryConnection(); + } + + protected static Connection getDorisQueryConnection(String database) { + return dorisContainerService.getQueryConnection(database); + } + + protected String getFenodes() { + return dorisContainerService.getFenodes(); + } + + protected String getBenodes() { + return dorisContainerService.getBenodes(); + } + + protected String getDorisUsername() { + return dorisContainerService.getUsername(); + } + + protected String getDorisPassword() { + return dorisContainerService.getPassword(); + } + + protected String getDorisQueryUrl() { + return dorisContainerService.getJdbcUrl(); + } + + protected String getDorisInstanceHost() { + return dorisContainerService.getInstanceHost(); + } + + public static void closeContainers() { + LOG.info("Starting to close containers."); + closeDorisContainer(); + } + + private static void closeDorisContainer() { + if (Objects.isNull(dorisContainerService)) { + return; + } + dorisContainerService.close(); + LOG.info("Doris container was closed."); + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + public static void assertEqualsInAnyOrder(List<Object> expected, List<Object> actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List<Object> expected, List<Object> actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new Object[0])); + } +} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/ContainerUtils.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/ContainerUtils.java new file mode 100644 index 0000000..1b54c69 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/ContainerUtils.java @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.container; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.doris.spark.exception.DorisRuntimeException; +import org.junit.Assert; +import org.slf4j.Logger; + +public class ContainerUtils { + + public static void executeSQLStatement(Connection connection, Logger logger, String... sql) { + if (Objects.isNull(sql) || sql.length == 0) { + return; + } + try (Statement statement = connection.createStatement()) { + for (String s : sql) { + if (StringUtils.isNotEmpty(s)) { + logger.info("start to execute sql={}", s); + statement.execute(s); + } + } + } catch (SQLException e) { + throw new DorisRuntimeException(e); + } + } + + public static List<String> executeSQLStatement( + Connection connection, Logger logger, String sql, int columnSize) { + List<String> result = new ArrayList<>(); + if (Objects.isNull(sql)) { + return result; + } + try (Statement statement = connection.createStatement()) { + logger.info("start to execute sql={}", sql); + ResultSet resultSet = statement.executeQuery(sql); + + while (resultSet.next()) { + StringJoiner sb = new StringJoiner(","); + for (int i = 1; i <= columnSize; i++) { + Object value = resultSet.getObject(i); + sb.add(String.valueOf(value)); + } + result.add(sb.toString()); + } + return result; + } catch (SQLException e) { + throw new DorisRuntimeException(e); + } + } + + public static String loadFileContent(String resourcePath) { + try (InputStream stream = + ContainerUtils.class.getClassLoader().getResourceAsStream(resourcePath)) { + return new BufferedReader(new InputStreamReader(Objects.requireNonNull(stream))) + .lines() + .collect(Collectors.joining("\n")); + } catch (IOException e) { + throw new DorisRuntimeException("Failed to read " + resourcePath + " file.", e); + } + } + + public static List<String> parseFileArgs(String resourcePath) { + String fileContent = ContainerUtils.loadFileContent(resourcePath); + String[] args = fileContent.split("\n"); + List<String> argList = new ArrayList<>(); + for (String arg : args) { + String[] split = arg.trim().split("\\s+"); + List<String> stringList = + Arrays.stream(split) + .map(ContainerUtils::removeQuotes) + .collect(Collectors.toList()); + argList.addAll(stringList); + } + return argList; + } + + private static String removeQuotes(String str) { + if (str == null || str.length() < 2) { + return str; + } + if (str.startsWith("\"") && str.endsWith("\"")) { + return str.substring(1, str.length() - 1); + } + if (str.startsWith("\\'") && str.endsWith("\\'")) { + return str.substring(1, str.length() - 1); + } + return str; + } + + public static String[] parseFileContentSQL(String resourcePath) { + String fileContent = loadFileContent(resourcePath); + return Arrays.stream(fileContent.split(";")).map(String::trim).toArray(String[]::new); + } + + public static void checkResult( + Connection connection, + Logger logger, + List<String> expected, + String query, + int columnSize) { + checkResult(connection, logger, expected, query, columnSize, true); + } + + public static void checkResult( + Connection connection, + Logger logger, + List<String> expected, + String query, + int columnSize, + boolean ordered) { + List<String> actual = getResult(connection, logger, expected, query, columnSize); + if (ordered) { + Assert.assertArrayEquals(expected.toArray(), actual.toArray()); + } else { + Assert.assertEquals(expected.size(), actual.size()); + Assert.assertArrayEquals( + expected.stream().sorted().toArray(Object[]::new), + actual.stream().sorted().toArray(Object[]::new)); + } + } + + public static List<String> getResult( + Connection connection, + Logger logger, + List<String> expected, + String query, + int columnSize) { + List<String> actual = new ArrayList<>(); + try (Statement statement = connection.createStatement()) { + ResultSet sinkResultSet = statement.executeQuery(query); + while (sinkResultSet.next()) { + List<String> row = new ArrayList<>(); + for (int i = 1; i <= columnSize; i++) { + Object value = sinkResultSet.getObject(i); + if (value == null) { + row.add("null"); + } else { + row.add(value.toString()); + } + } + actual.add(StringUtils.join(row, ",")); + } + } catch (SQLException e) { + logger.info( + "Failed to check query result. expected={}, actual={}", + String.join(",", expected), + String.join(",", actual), + e); + throw new DorisRuntimeException(e); + } + logger.info( + "checking test result. expected={}, actual={}", + String.join(",", expected), + String.join(",", actual)); + return actual; + } +} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java new file mode 100644 index 0000000..3ec7ee5 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.container.instance; + +import org.apache.doris.spark.exception.DorisRuntimeException; + +import java.sql.Connection; + +public interface ContainerService { + void startContainer(); + + default void restartContainer() { + throw new DorisRuntimeException("Only doris docker container can implemented."); + }; + + boolean isRunning(); + + Connection getQueryConnection(); + + Connection getQueryConnection(String database); + + String getJdbcUrl(); + + String getInstanceHost(); + + Integer getMappedPort(int originalPort); + + String getUsername(); + + String getPassword(); + + String getFenodes(); + + String getBenodes(); + + void close(); +} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java new file mode 100644 index 0000000..7c9297e --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java @@ -0,0 +1,276 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.container.instance; + +import org.apache.doris.spark.exception.DorisRuntimeException; + +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.LockSupport; + +public class DorisContainer implements ContainerService { + private static final Logger LOG = LoggerFactory.getLogger(DorisContainer.class); + private static final String DEFAULT_DOCKER_IMAGE = "apache/doris:doris-all-in-one-2.1.0"; + private static final String DORIS_DOCKER_IMAGE = + System.getProperty("image") == null + ? DEFAULT_DOCKER_IMAGE + : System.getProperty("image"); + private static final String DRIVER_JAR = + "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; + private static final String JDBC_URL = "jdbc:mysql://%s:9030"; + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private final GenericContainer dorisContainer; + + public DorisContainer() { + dorisContainer = createDorisContainer(); + } + + public GenericContainer createDorisContainer() { + LOG.info("Will create doris containers."); + GenericContainer container = + new GenericContainer<>(DORIS_DOCKER_IMAGE) + .withNetwork(Network.newNetwork()) + .withNetworkAliases("DorisContainer") + .withPrivilegedMode(true) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))) + // use customer conf + .withCopyFileToContainer( + MountableFile.forClasspathResource("docker/doris/be.conf"), + "/opt/apache-doris/be/conf/be.conf") + .withCopyFileToContainer( + MountableFile.forClasspathResource("docker/doris/fe.conf"), + "/opt/apache-doris/fe/conf/fe.conf") + .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610); + + container.setPortBindings( + Lists.newArrayList( + String.format("%s:%s", "8030", "8030"), + String.format("%s:%s", "9030", "9030"), + String.format("%s:%s", "9060", "9060"), + String.format("%s:%s", "8040", "8040"), + String.format("%s:%s", "9611", "9611"), + String.format("%s:%s", "9610", "9610"))); + return container; + } + + public void startContainer() { + try { + LOG.info("Starting doris containers."); + // singleton doris container + dorisContainer.start(); + initializeJdbcConnection(); + initializeVariables(); + printClusterStatus(); + } catch (Exception ex) { + LOG.error("Failed to start containers doris", ex); + throw new DorisRuntimeException("Failed to start containers doris", ex); + } + LOG.info("Doris container started successfully."); + } + + @Override + public void restartContainer() { + LOG.info("Restart doris container."); + dorisContainer + .getDockerClient() + .restartContainerCmd(dorisContainer.getContainerId()) + .exec(); + } + + @Override + public boolean isRunning() { + return dorisContainer.isRunning(); + } + + @Override + public Connection getQueryConnection() { + return getQueryConnection(""); + } + + @Override + public Connection getQueryConnection(String database) { + LOG.info("Try to get query connection from doris."); + String jdbcUrl = String.format(JDBC_URL, dorisContainer.getHost()); + jdbcUrl = jdbcUrl + "/" + database; + try { + return DriverManager.getConnection(jdbcUrl, USERNAME, PASSWORD); + } catch (SQLException e) { + LOG.info("Failed to get doris query connection. jdbcUrl={}", jdbcUrl, e); + throw new DorisRuntimeException(e); + } + } + + private void initializeVariables() throws Exception { + try (Connection connection = getQueryConnection(); + Statement statement = connection.createStatement()) { + LOG.info("init doris cluster variables."); + // avoid arrow flight sql reading bug + statement.execute("SET PROPERTY FOR 'root' 'max_user_connections' = '1024';"); + } + LOG.info("Init variables successfully."); + } + + @Override + public String getJdbcUrl() { + return String.format(JDBC_URL, dorisContainer.getHost()); + } + + @Override + public String getInstanceHost() { + return dorisContainer.getHost(); + } + + @Override + public Integer getMappedPort(int originalPort) { + return dorisContainer.getMappedPort(originalPort); + } + + @Override + public String getUsername() { + return USERNAME; + } + + @Override + public String getPassword() { + return PASSWORD; + } + + @Override + public String getFenodes() { + return dorisContainer.getHost() + ":8030"; + } + + @Override + public String getBenodes() { + return dorisContainer.getHost() + ":8040"; + } + + public void close() { + LOG.info("Doris container is about to be close."); + dorisContainer.close(); + LOG.info("Doris container closed successfully."); + } + + private void initializeJDBCDriver() throws MalformedURLException { + URLClassLoader urlClassLoader = + new URLClassLoader( + new URL[] {new URL(DRIVER_JAR)}, DorisContainer.class.getClassLoader()); + LOG.info("Try to connect to Doris."); + Thread.currentThread().setContextClassLoader(urlClassLoader); + } + + private void initializeJdbcConnection() throws Exception { + initializeJDBCDriver(); + try (Connection connection = getQueryConnection(); + Statement statement = connection.createStatement()) { + ResultSet resultSet; + do { + LOG.info("Waiting for the Backend to start successfully."); + resultSet = statement.executeQuery("show backends"); + } while (!isBeReady(resultSet, Duration.ofSeconds(1L))); + } + LOG.info("Connected to Doris successfully."); + } + + private boolean isBeReady(ResultSet rs, Duration duration) throws SQLException { + LockSupport.parkNanos(duration.toNanos()); + if (rs.next()) { + String isAlive = rs.getString("Alive").trim(); + String totalCap = rs.getString("TotalCapacity").trim(); + return Boolean.toString(true).equalsIgnoreCase(isAlive) + && !"0.000".equalsIgnoreCase(totalCap); + } + return false; + } + + private void printClusterStatus() throws Exception { + LOG.info("Current machine IP: {}", dorisContainer.getHost()); + echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq"); + echo("sh", "-c", "free -h"); + try (Connection connection = + DriverManager.getConnection( + String.format(JDBC_URL, dorisContainer.getHost()), + USERNAME, + PASSWORD); + Statement statement = connection.createStatement()) { + ResultSet showFrontends = statement.executeQuery("show frontends"); + LOG.info("Frontends status: {}", convertList(showFrontends)); + ResultSet showBackends = statement.executeQuery("show backends"); + LOG.info("Backends status: {}", convertList(showBackends)); + } + } + + private void echo(String... cmd) { + try { + Process p = Runtime.getRuntime().exec(cmd); + InputStream is = p.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + String line; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + p.waitFor(); + is.close(); + reader.close(); + p.destroy(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private List<Map> convertList(ResultSet rs) throws SQLException { + List<Map> list = new ArrayList<>(); + ResultSetMetaData metaData = rs.getMetaData(); + int columnCount = metaData.getColumnCount(); + while (rs.next()) { + Map<String, Object> rowData = new HashMap<>(); + for (int i = 1; i <= columnCount; i++) { + rowData.put(metaData.getColumnName(i), rs.getObject(i)); + } + list.add(rowData); + } + return list; + } +} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java new file mode 100644 index 0000000..4ba4e74 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.container.instance; + +import org.apache.doris.spark.exception.DorisRuntimeException; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +/** Using a custom Doris environment */ +public class DorisCustomerContainer implements ContainerService { + private static final Logger LOG = LoggerFactory.getLogger(DorisCustomerContainer.class); + private static final String JDBC_URL = "jdbc:mysql://%s:%s"; + + @Override + public void startContainer() { + LOG.info("Using doris customer containers env."); + checkParams(); + if (!isRunning()) { + throw new DorisRuntimeException( + "Backend is not alive. Please check the doris cluster."); + } + } + + private void checkParams() { + Preconditions.checkArgument( + System.getProperty("doris_host") != null, "doris_host is required."); + Preconditions.checkArgument( + System.getProperty("doris_query_port") != null, "doris_query_port is required."); + Preconditions.checkArgument( + System.getProperty("doris_http_port") != null, "doris_http_port is required."); + Preconditions.checkArgument( + System.getProperty("doris_user") != null, "doris_user is required."); + Preconditions.checkArgument( + System.getProperty("doris_passwd") != null, "doris_passwd is required."); + } + + @Override + public boolean isRunning() { + try (Connection conn = getQueryConnection(); + Statement stmt = conn.createStatement()) { + ResultSet showBackends = stmt.executeQuery("show backends"); + while (showBackends.next()) { + String isAlive = showBackends.getString("Alive").trim(); + if (Boolean.toString(true).equalsIgnoreCase(isAlive)) { + return true; + } + } + } catch (SQLException e) { + LOG.error("Failed to connect doris cluster.", e); + return false; + } + return false; + } + + @Override + public Connection getQueryConnection() { + return getQueryConnection(""); + } + + @Override + public Connection getQueryConnection(String database) { + LOG.info("Try to get query connection from doris."); + String jdbcUrl = + String.format( + JDBC_URL, + System.getProperty("doris_host"), + System.getProperty("doris_query_port")); + jdbcUrl = jdbcUrl + "/" + database; + try { + return DriverManager.getConnection(jdbcUrl, getUsername(), getPassword()); + } catch (SQLException e) { + LOG.info("Failed to get doris query connection. jdbcUrl={}", jdbcUrl, e); + throw new DorisRuntimeException(e); + } + } + + @Override + public String getJdbcUrl() { + return String.format( + JDBC_URL, System.getProperty("doris_host"), System.getProperty("doris_query_port")); + } + + @Override + public String getInstanceHost() { + return System.getProperty("doris_host"); + } + + @Override + public Integer getMappedPort(int originalPort) { + return originalPort; + } + + @Override + public String getUsername() { + return System.getProperty("doris_user"); + } + + @Override + public String getPassword() { + return System.getProperty("doris_passwd"); + } + + @Override + public String getFenodes() { + return System.getProperty("doris_host") + ":" + System.getProperty("doris_http_port"); + } + + @Override + public String getBenodes() { + return null; + } + + @Override + public void close() {} +} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala index 2d7930a..67a0688 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala @@ -17,34 +17,66 @@ package org.apache.doris.spark.sql -import org.apache.doris.spark.{DorisTestBase, sparkContextFunctions} -import org.apache.spark.sql.SparkSession +import org.apache.doris.spark.container.AbstractContainerTestBase.getDorisQueryConnection +import org.apache.doris.spark.container.{AbstractContainerTestBase, ContainerUtils} +import org.apache.doris.spark.rest.models.DataModel +import org.apache.doris.spark.sparkContextFunctions +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.{SparkConf, SparkContext} -import org.junit.Test +import org.junit.Assert.fail +import org.junit.{Before, Test} +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.slf4j.LoggerFactory -import java.sql.Statement +import java.sql.{Date, Timestamp} -class DorisReaderITCase extends DorisTestBase { +object DorisReaderITCase { + @Parameterized.Parameters(name = "readMode: {0}, flightSqlPort: {1}") + def parameters(): java.util.Collection[Array[AnyRef]] = { + import java.util.Arrays + Arrays.asList( + Array("thrift": java.lang.String, -1: java.lang.Integer), + Array("arrow": java.lang.String, 9611: java.lang.Integer) + ) + } +} + +@RunWith(classOf[Parameterized]) +class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractContainerTestBase { - val DATABASE: String = "test" - val TABLE_READ: String = "tbl_read" - val TABLE_READ_TBL: String = "tbl_read_tbl" + private val LOG = LoggerFactory.getLogger(classOf[DorisReaderITCase]) + + val DATABASE = "test_doris_read" + val TABLE_READ = "tbl_read" + val TABLE_READ_TBL = "tbl_read_tbl" + val TABLE_READ_TBL_ALL_TYPES = "tbl_read_tbl_all_types" + val TABLE_READ_TBL_BIT_MAP = "tbl_read_tbl_bitmap" + + @Before + def setUp(): Unit = { + ContainerUtils.executeSQLStatement(getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)) + } @Test @throws[Exception] def testRddSource(): Unit = { - initializeTable(TABLE_READ) - val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("rddSource") + initializeTable(TABLE_READ, DataModel.DUPLICATE) + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rddSource") val sc = new SparkContext(sparkConf) // sc.setLogLevel("DEBUG") val dorisSparkRDD = sc.dorisRDD( tableIdentifier = Some(DATABASE + "." + TABLE_READ), cfg = Some(Map( - "doris.fenodes" -> DorisTestBase.getFenodes, - "doris.request.auth.user" -> DorisTestBase.USERNAME, - "doris.request.auth.password" -> DorisTestBase.PASSWORD, - "doris.fe.init.fetch" -> "false" + "doris.fenodes" -> getFenodes, + "doris.request.auth.user" -> getDorisUsername, + "doris.request.auth.password" -> getDorisPassword, + "doris.fe.init.fetch" -> "false", + "doris.read.mode" -> readMode, + "doris.read.arrow-flight-sql.port" -> flightSqlPort.toString )) ) val result = dorisSparkRDD.collect() @@ -56,15 +88,17 @@ class DorisReaderITCase extends DorisTestBase { @Test @throws[Exception] def testDataFrameSource(): Unit = { - initializeTable(TABLE_READ_TBL) + initializeTable(TABLE_READ_TBL, DataModel.UNIQUE) val session = SparkSession.builder().master("local[*]").getOrCreate() val dorisSparkDF = session.read .format("doris") - .option("doris.fenodes", DorisTestBase.getFenodes) + .option("doris.fenodes", getFenodes) .option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL) - .option("doris.user", DorisTestBase.USERNAME) - .option("doris.password", DorisTestBase.PASSWORD) + .option("doris.user", getDorisUsername) + .option("doris.password", getDorisPassword) + .option("doris.read.mode", readMode) + .option("doris.read.arrow-flight-sql.port", flightSqlPort.toString) .load() val result = dorisSparkDF.collect().toList.toString() @@ -75,7 +109,7 @@ class DorisReaderITCase extends DorisTestBase { @Test @throws[Exception] def testSQLSource(): Unit = { - initializeTable(TABLE_READ_TBL) + initializeTable(TABLE_READ_TBL, DataModel.UNIQUE_MOR) val session = SparkSession.builder().master("local[*]").getOrCreate() session.sql( s""" @@ -83,9 +117,11 @@ class DorisReaderITCase extends DorisTestBase { |USING doris |OPTIONS( | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}", - | "fenodes"="${DorisTestBase.getFenodes}", - | "user"="${DorisTestBase.USERNAME}", - | "password"="${DorisTestBase.PASSWORD}" + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" |) |""".stripMargin) @@ -98,53 +134,44 @@ class DorisReaderITCase extends DorisTestBase { assert("List([doris,18], [spark,10])".equals(result)) } - @throws[Exception] - private def initializeTable(table: String): Unit = { - try { - val statement: Statement = DorisTestBase.connection.createStatement - try { - statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)) - statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table)) - statement.execute(String.format("CREATE TABLE %s.%s ( \n" + - "`name` varchar(256),\n" + - "`age` int\n" + - ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + - "PROPERTIES (\n" + - "\"replication_num\" = \"1\"\n" + - ")\n", DATABASE, table)) - statement.execute(String.format("insert into %s.%s values ('doris',18)", DATABASE, table)) - statement.execute(String.format("insert into %s.%s values ('spark',10)", DATABASE, table)) - } finally { - if (statement != null) statement.close() - } - } + private def initializeTable(table: String, dataModel: DataModel): Unit = { + val max = if (DataModel.AGGREGATE == dataModel) "MAX" else "" + val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else ",\"enable_unique_key_merge_on_write\" = \"false\"" + val model = if (dataModel == DataModel.UNIQUE_MOR) DataModel.UNIQUE.toString else dataModel.toString + ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format("CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int %s\n" + + ") " + + " %s KEY(`name`) " + + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + "PROPERTIES (" + + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table, max, model), + String.format("insert into %s.%s values ('doris',18)", DATABASE, table), + String.format("insert into %s.%s values ('spark',10)", DATABASE, table)) } - private def compareCollectResult(a1: Array[AnyRef], a2: Array[AnyRef]): Boolean = { - if (a1.length == a2.length) { - for (idx <- 0 until a1.length) { - if (!a1(idx).isInstanceOf[Array[AnyRef]] || !a2(idx).isInstanceOf[Array[AnyRef]]) { - return false - } - val arr1 = a1(idx).asInstanceOf[Array[AnyRef]] - val arr2 = a2(idx).asInstanceOf[Array[AnyRef]] - if (arr1.length != arr2.length) { - return false - } - for (idx2 <- 0 until arr2.length) { - if (arr1(idx2) != arr2(idx2)) { - return false - } - } + private def compareCollectResult(a1: Array[AnyRef], a2: Array[AnyRef]): Boolean = if (a1.length == a2.length) { + for (idx <- 0 until a1.length) { + if (!a1(idx).isInstanceOf[Array[AnyRef]] || !a2(idx).isInstanceOf[Array[AnyRef]]) return false + val arr1 = a1(idx).asInstanceOf[Array[AnyRef]] + val arr2 = a2(idx).asInstanceOf[Array[AnyRef]] + if (arr1.length != arr2.length) return false + for (idx2 <- 0 until arr2.length) { + if (arr1(idx2) != arr2(idx2)) return false } - true - } else false - } + } + true + } else false @Test @throws[Exception] def testSQLSourceWithCondition(): Unit = { - initializeTable(TABLE_READ_TBL) + initializeTable(TABLE_READ_TBL, DataModel.AGGREGATE) val session = SparkSession.builder().master("local[*]").getOrCreate() session.sql( s""" @@ -152,9 +179,11 @@ class DorisReaderITCase extends DorisTestBase { |USING doris |OPTIONS( | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}", - | "fenodes"="${DorisTestBase.getFenodes}", - | "user"="${DorisTestBase.USERNAME}", - | "password"="${DorisTestBase.PASSWORD}" + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" |) |""".stripMargin) @@ -167,5 +196,232 @@ class DorisReaderITCase extends DorisTestBase { assert("List([doris,18])".equals(result)) } + @Test + def testReadAllType(): Unit = { + val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql") + ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) + + val session = SparkSession.builder().master("local[*]").getOrCreate() + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + session.sql("desc test_source").show(true); + val actualData = session.sql( + """ + |select * from test_source order by id + |""".stripMargin).collect() + session.stop() + + val expectedData = Array( + Row(1, true, 127, 32767, 2147483647, 9223372036854775807L, "170141183460469231731687303715884105727", + 3.14f, 2.71828, new java.math.BigDecimal("12345.6789"), Date.valueOf("2025-03-11"), Timestamp.valueOf("2025-03-11 12:34:56"), "A", "Hello, Doris!", "This is a string", + """["Alice","Bob"]""", Map("key1" -> "value1", "key2" -> "value2"), """{"name":"Tom","age":30}""", + """{"key":"value"}""", """{"type":"variant","data":123}"""), + Row(2, false, -128, -32768, -2147483648, -9223372036854775808L, "-170141183460469231731687303715884105728", + -1.23f, 0.0001, new java.math.BigDecimal("-9999.9999"), Date.valueOf("2024-12-25"), Timestamp.valueOf("2024-12-25 23:59:59"), "B", "Doris Test", "Another string!", + """["Charlie","David"]""", Map("k1" -> "v1", "k2" -> "v2"), """{"name":"Jerry","age":25}""", + """{"status":"ok"}""", """{"data":[1,2,3]}"""), + Row(3, true, 0, 0, 0, 0, "0", + 0.0f, 0.0, new java.math.BigDecimal("0.0000"), Date.valueOf("2023-06-15"), Timestamp.valueOf("2023-06-15 08:00:00"), "C", "Test Doris", "Sample text", + """["Eve","Frank"]""", Map("alpha" -> "beta"), """{"name":"Alice","age":40}""", + """{"nested":{"key":"value"}}""", """{"variant":"test"}"""), + Row(4, null, null, null, null, null, null, + null, null, null, null, null, null, null, null, + null, null, null, null, null) + ) + + val differences = actualData.zip(expectedData).zipWithIndex.flatMap { + case ((actualRow, expectedRow), rowIndex) => + actualRow.toSeq.zip(expectedRow.toSeq).zipWithIndex.collect { + case ((actualValue, expectedValue), colIndex) + if actualValue != expectedValue => + s"Row $rowIndex, Column $colIndex: actual=$actualValue, expected=$expectedValue" + } + } + + if (differences.nonEmpty) { + fail(s"Data mismatch found:\n${differences.mkString("\n")}") + } + } + + @Test + def testBitmapRead(): Unit = { + val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql") + ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) + val session = SparkSession.builder().master("local[*]").getOrCreate() + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + session.sql("desc test_source").show(true); + val actualData = session.sql( + """ + |select * from test_source order by hour + |""".stripMargin).collect() + session.stop() + + assert("List([20200622,1,Read unsupported], [20200622,2,Read unsupported], [20200622,3,Read unsupported])".equals(actualData.toList.toString())) + } + + @Test + def testBitmapRead2String(): Unit = { + if(readMode.equals("thrift")){ + return + } + val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql") + ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) + val session = SparkSession.builder().master("local[*]").getOrCreate() + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}", + | "doris.read.bitmap-to-string"="true" + |) + |""".stripMargin) + session.sql("desc test_source").show(true); + val actualData = session.sql( + """ + |select * from test_source order by hour + |""".stripMargin).collect() + session.stop() + + assert("List([20200622,1,243], [20200622,2,1,2,3,4,5,434543], [20200622,3,287667876573])" + .equals(actualData.toList.toString())) + } + + @Test + def testBitmapRead2Base64(): Unit = { + if(readMode.equals("thrift")){ + return + } + val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql") + ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) + val session = SparkSession.builder().master("local[*]").getOrCreate() + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}", + | "doris.read.bitmap-to-base64"="true" + |) + |""".stripMargin) + session.sql("desc test_source").show(true); + val actualData = session.sql( + """ + |select * from test_source order by hour + |""".stripMargin).collect() + session.stop() + + assert("List([20200622,1,AfMAAAA=], [20200622,2,AjswAQABAAAEAAYAAAABAAEABABvoQ==], [20200622,3,A91yV/pCAAAA])" + .equals(actualData.toList.toString())) + } + @Test + def testReadPushDownProject(): Unit = { + val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql") + ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) + val session = SparkSession.builder().master("local[*]").getOrCreate() + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + + val intFilter = session.sql( + """ + |select id,c1,c2 from test_source where id = 2 and c1 = false and c4 != 3 + |""".stripMargin).collect() + + assert("List([2,false,-128])".equals(intFilter.toList.toString())) + + val floatFilter = session.sql( + """ + |select id,c3,c4,c7,c9 from test_source where c7 > 0 and c7 < 3.15 + |""".stripMargin).collect() + + assert("List([1,32767,2147483647,3.14,12345.6789])".equals(floatFilter.toList.toString())) + + val dateFilter = session.sql( + """ + |select id,c10,c11 from test_source where c10 = '2025-03-11' and c13 like 'Hello%' + |""".stripMargin).collect() + + assert("List([1,2025-03-11,2025-03-11 12:34:56.0])".equals(dateFilter.toList.toString())) + + val datetimeFilter = session.sql( + """ + |select id,c11,c12 from test_source where c10 < '2025-03-11' and c11 = '2024-12-25 23:59:59' + |""".stripMargin).collect() + + assert("List([2,2024-12-25 23:59:59.0,B])".equals(datetimeFilter.toList.toString())) + + val stringFilter = session.sql( + """ + |select id,c13,c14 from test_source where c11 >= '2024-12-25 23:59:59' and c13 = 'Hello, Doris!' + |""".stripMargin).collect() + + assert("List([1,Hello, Doris!,This is a string])".equals(stringFilter.toList.toString())) + + val nullFilter = session.sql( + """ + |select id,c13,c14 from test_source where c14 is null + |""".stripMargin).collect() + + assert("List([4,null,null])".equals(nullFilter.toList.toString())) + + val notNullFilter = session.sql( + """ + |select id from test_source where c15 is not null and c12 in ('A', 'B') + |""".stripMargin).collect() + + assert("List([1], [2])".equals(notNullFilter.toList.toString())) + + val likeFilter = session.sql( + """ + |select id from test_source where c19 like '%variant%' and c13 like 'Test%' + |""".stripMargin).collect() + + assert("List([3])".equals(likeFilter.toList.toString())) + session.stop() + } } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala index 6e1bd08..7f1e393 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala @@ -17,15 +17,17 @@ package org.apache.doris.spark.sql -import org.apache.doris.spark.DorisTestBase -import org.apache.spark.sql.types.{ArrayType, DataTypes, DecimalType, MapType, StructField, StructType} -import org.apache.spark.sql.{Row, SaveMode, SparkSession} +import org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder, getDorisQueryConnection} +import org.apache.doris.spark.container.{AbstractContainerTestBase, ContainerUtils} +import org.apache.spark.sql.{SaveMode, SparkSession} import org.junit.Test +import org.slf4j.LoggerFactory -import java.sql.{Date, ResultSet, Statement, Timestamp} -import scala.collection.mutable.ListBuffer +import java.util +import scala.collection.JavaConverters._ +class DorisWriterITCase extends AbstractContainerTestBase { -class DorisWriterITCase extends DorisTestBase { + private val LOG = LoggerFactory.getLogger(classOf[DorisReaderITCase]) val DATABASE: String = "test" val TABLE_CSV: String = "tbl_csv" @@ -43,10 +45,10 @@ class DorisWriterITCase extends DorisTestBase { )).toDF("name", "age") df.write .format("doris") - .option("doris.fenodes", DorisTestBase.getFenodes) + .option("doris.fenodes", getFenodes) .option("doris.table.identifier", DATABASE + "." + TABLE_CSV) - .option("user", DorisTestBase.USERNAME) - .option("password", DorisTestBase.PASSWORD) + .option("user", getDorisUsername) + .option("password", getDorisPassword) .option("sink.properties.column_separator", ",") .option("sink.properties.line_delimiter", "\n") .option("sink.properties.format", "csv") @@ -55,9 +57,13 @@ class DorisWriterITCase extends DorisTestBase { session.stop() Thread.sleep(10000) - val actual = queryResult(TABLE_CSV); - val expected = ListBuffer(List("doris_csv", 1), List("spark_csv", 2)) - assert(expected.equals(actual)) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_CSV), + 2) + val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2") + checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(), actual.toArray) } @Test @@ -71,10 +77,10 @@ class DorisWriterITCase extends DorisTestBase { )).toDF("name", "age") df.write .format("doris") - .option("doris.fenodes", DorisTestBase.getFenodes) + .option("doris.fenodes", getFenodes) .option("doris.table.identifier", DATABASE + "." + TABLE_JSON) - .option("user", DorisTestBase.USERNAME) - .option("password", DorisTestBase.PASSWORD) + .option("user", getDorisUsername) + .option("password", getDorisPassword) .option("sink.properties.read_json_by_line", "true") .option("sink.properties.format", "json") .option("doris.sink.auto-redirect", "false") @@ -83,9 +89,13 @@ class DorisWriterITCase extends DorisTestBase { session.stop() Thread.sleep(10000) - val actual = queryResult(TABLE_JSON); - val expected = ListBuffer(List("doris_json", 1), List("spark_json", 2)) - assert(expected.equals(actual)) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON), + 2) + val expected = util.Arrays.asList("doris_json,1", "spark_json,2"); + checkResultInAnyOrder("testSinkJsonFormat", expected.toArray, actual.toArray) } @Test @@ -104,9 +114,9 @@ class DorisWriterITCase extends DorisTestBase { |USING doris |OPTIONS( | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL}", - | "fenodes"="${DorisTestBase.getFenodes}", - | "user"="${DorisTestBase.USERNAME}", - | "password"="${DorisTestBase.PASSWORD}" + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}" |) |""".stripMargin) session.sql( @@ -116,40 +126,34 @@ class DorisWriterITCase extends DorisTestBase { session.stop() Thread.sleep(10000) - val actual = queryResult(TABLE_JSON_TBL); - val expected = ListBuffer(List("doris_tbl", 1), List("spark_tbl", 2)) - assert(expected.equals(actual)) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL), + 2) + val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2"); + checkResultInAnyOrder("testSQLSinkFormat", expected.toArray, actual.toArray) } - private def queryResult(table: String): ListBuffer[Any] = { - val actual = new ListBuffer[Any] - try { - val sinkStatement: Statement = DorisTestBase.connection.createStatement - try { - val sinkResultSet: ResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, table)) - while (sinkResultSet.next) { - val row = List(sinkResultSet.getString("name"), sinkResultSet.getInt("age")) - actual += row - } - } finally if (sinkStatement != null) sinkStatement.close() - } - actual - } @throws[Exception] private def initializeTable(table: String): Unit = { - try { - val statement: Statement = DorisTestBase.connection.createStatement - try { - statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)) - statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table)) - statement.execute(String.format( - "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" + ") " + + ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format( + "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" + ") " + "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + "PROPERTIES (\n" + - "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table)) - } finally if (statement != null) statement.close() - } + "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table) + ) + } + + private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], actual: Array[AnyRef]): Unit = { + LOG.info("Checking DorisSourceITCase result. testName={}, actual={}, expected={}", testName, actual, expected) + assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava) } } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_all_type.sql b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_all_type.sql new file mode 100644 index 0000000..7632e02 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_all_type.sql @@ -0,0 +1,50 @@ +DROP TABLE IF EXISTS tbl_read_tbl_all_types; + +CREATE TABLE tbl_read_tbl_all_types ( +`id` int, +`c1` boolean, +`c2` tinyint, +`c3` smallint, +`c4` int, +`c5` bigint, +`c6` largeint, +`c7` float, +`c8` double, +`c9` decimal(12,4), +`c10` date, +`c11` datetime, +`c12` char(1), +`c13` varchar(16), +`c14` string, +`c15` Array<String>, +`c16` Map<String, String>, +`c17` Struct<name: String, age: int>, +`c18` JSON, +`c19` JSON -- doris2.1.0 can not read VARIANT +) +DUPLICATE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 2 +PROPERTIES ( +"replication_num" = "1", +"light_schema_change" = "true" +); + +INSERT INTO tbl_read_tbl_all_types VALUES + (1, true, 127, 32767, 2147483647, 9223372036854775807, 170141183460469231731687303715884105727, + 3.14, 2.71828, 12345.6789, '2025-03-11', '2025-03-11 12:34:56', 'A', 'Hello, Doris!', 'This is a string', + ['Alice', 'Bob'], {'key1': 'value1', 'key2': 'value2'}, STRUCT('Tom', 30), '{"key": "value"}', '{"type": "variant", "data": 123}'); + +INSERT INTO tbl_read_tbl_all_types VALUES + (2, false, -128, -32768, -2147483648, -9223372036854775808, -170141183460469231731687303715884105728, + -1.23, 0.0001, -9999.9999, '2024-12-25', '2024-12-25 23:59:59', 'B', 'Doris Test', 'Another string!', + ['Charlie', 'David'], {'k1': 'v1', 'k2': 'v2'}, STRUCT('Jerry', 25), '{"status": "ok"}', '{"data": [1, 2, 3]}' ); + +INSERT INTO tbl_read_tbl_all_types VALUES + (3, true, 0, 0, 0, 0, 0, + 0.0, 0.0, 0.0000, '2023-06-15', '2023-06-15 08:00:00', 'C', 'Test Doris', 'Sample text', + ['Eve', 'Frank'], {'alpha': 'beta'}, STRUCT('Alice', 40), '{"nested": {"key": "value"}}', '{"variant": "test"}'); + +INSERT INTO tbl_read_tbl_all_types VALUES + (4, NULL, NULL, NULL, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, NULL); \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_bitmap.sql b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_bitmap.sql new file mode 100644 index 0000000..8e89178 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_bitmap.sql @@ -0,0 +1,17 @@ +DROP TABLE IF EXISTS tbl_read_tbl_bitmap; + +create table tbl_read_tbl_bitmap ( +datekey int, +hour int, +device_id bitmap BITMAP_UNION +) +aggregate key (datekey, hour) +distributed by hash(datekey, hour) buckets 1 +properties( + "replication_num" = "1" +); + +insert into tbl_read_tbl_bitmap values +(20200622, 1, to_bitmap(243)), +(20200622, 2, bitmap_from_array([1,2,3,4,5,434543])), +(20200622, 3, to_bitmap(287667876573)); \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/be.conf b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/be.conf new file mode 100644 index 0000000..94b76e0 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/be.conf @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CUR_DATE=`date +%Y%m%d-%H%M%S` + +PPROF_TMPDIR="$DORIS_HOME/log/" + +JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" + +# For jdk 9+, this JAVA_OPTS will be used as default JVM options +JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" + +# For jdk 17+, this JAVA_OPTS will be used as default JVM options +JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED" + +# since 1.2, the JAVA_HOME need to be set to run BE process. +# JAVA_HOME=/path/to/jdk/ + +# https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile +# https://jemalloc.net/jemalloc.3.html +JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:15000,dirty_decay_ms:15000,oversize_threshold:0,prof:false,lg_prof_interval:32,lg_prof_sample:19,prof_gdump:false,prof_accum:false,prof_leak:false,prof_final:false" +JEMALLOC_PROF_PRFIX="" + +# INFO, WARNING, ERROR, FATAL +sys_log_level = INFO + +# ports for admin, web, heartbeat service +be_port = 9060 +webserver_port = 8040 +heartbeat_service_port = 9050 +brpc_port = 8060 +arrow_flight_sql_port = 9610 +enable_debug_points = true + +# HTTPS configures +enable_https = false +# path of certificate in PEM format. +ssl_certificate_path = "$DORIS_HOME/conf/cert.pem" +# path of private key in PEM format. +ssl_private_key_path = "$DORIS_HOME/conf/key.pem" + + +# Choose one if there are more than one ip except loopback address. +# Note that there should at most one ip match this list. +# If no ip match this rule, will choose one randomly. +# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1 +# Default value is empty. +# priority_networks = 10.10.10.0/24;192.168.0.0/16 + +# data root path, separate by ';' +# You can specify the storage type for each root path, HDD (cold data) or SSD (hot data) +# eg: +# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris +# storage_root_path = /home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD +# /home/disk2/doris,medium:HDD(default) +# +# you also can specify the properties by setting '<property>:<value>', separate by ',' +# property 'medium' has a higher priority than the extension of path +# +# Default value is ${DORIS_HOME}/storage, you should create it by hand. +# storage_root_path = ${DORIS_HOME}/storage + +# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers +# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers + +# Advanced configurations +# sys_log_dir = ${DORIS_HOME}/log +# sys_log_roll_mode = SIZE-MB-1024 +# sys_log_roll_num = 10 +# sys_log_verbose_modules = * +# log_buffer_level = -1 +# palo_cgroups + +# aws sdk log level +# Off = 0, +# Fatal = 1, +# Error = 2, +# Warn = 3, +# Info = 4, +# Debug = 5, +# Trace = 6 +# Default to turn off aws sdk log, because aws sdk errors that need to be cared will be output through Doris logs +aws_log_level=0 +## If you are not running in aws cloud, you can disable EC2 metadata +AWS_EC2_METADATA_DISABLED=true \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/fe.conf b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/fe.conf new file mode 100644 index 0000000..a45fb53 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/fe.conf @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +##################################################################### +## The uppercase properties are read and exported by bin/start_fe.sh. +## To see all Frontend configurations, +## see fe/src/org/apache/doris/common/Config.java +##################################################################### + +CUR_DATE=`date +%Y%m%d-%H%M%S` + +# Log dir +LOG_DIR = ${DORIS_HOME}/log + +# For jdk 17, this JAVA_OPTS will be used as default JVM options +JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR -Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED" + +# Set your own JAVA_HOME +# JAVA_HOME=/path/to/jdk/ + +## +## the lowercase properties are read by main program. +## + +# store metadata, must be created before start FE. +# Default value is ${DORIS_HOME}/doris-meta +# meta_dir = ${DORIS_HOME}/doris-meta + +# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers +# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers + +http_port = 8030 +rpc_port = 9020 +query_port = 9030 +edit_log_port = 9010 +arrow_flight_sql_port = 9611 +enable_debug_points = true +arrow_flight_token_cache_size = 50 +# Choose one if there are more than one ip except loopback address. +# Note that there should at most one ip match this list. +# If no ip match this rule, will choose one randomly. +# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1 +# Default value is empty. +# priority_networks = 10.10.10.0/24;192.168.0.0/16 + +# Advanced configurations +# log_roll_size_mb = 1024 +# INFO, WARN, ERROR, FATAL +sys_log_level = INFO +# NORMAL, BRIEF, ASYNC +sys_log_mode = ASYNC +# sys_log_roll_num = 10 +# sys_log_verbose_modules = org.apache.doris +# audit_log_dir = $LOG_DIR +# audit_log_modules = slow_query, query +# audit_log_roll_num = 10 +# meta_delay_toleration_second = 10 +# qe_max_connection = 1024 +# qe_query_timeout_second = 300 +# qe_slow_log_ms = 5000 \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties new file mode 100644 index 0000000..ecb73d3 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c [%t] %x - %m%n \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala index f13830c..f7e08b9 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala @@ -17,8 +17,10 @@ package org.apache.doris.spark.read.expression +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And, Not, Or} import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, Literal, NamedReference} +import org.apache.spark.sql.types.{DateType, TimestampType} class V2ExpressionBuilder(inValueLengthLimit: Int) { @@ -36,7 +38,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) { case _: AlwaysFalse => "1=0" case expr: Expression => expr match { - case literal: Literal[_] => literal.toString + case literal: Literal[_] => visitLiteral(literal) case namedRef: NamedReference => namedRef.toString case e: GeneralScalarExpression => e.name() match { case "IN" => @@ -61,6 +63,17 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) { } } + def visitLiteral(literal: Literal[_]): String = { + if (literal.value() == null) { + return "null" + } + literal.dataType() match { + case DateType => s"'${DateTimeUtils.toJavaDate(literal.value().asInstanceOf[Int]).toString}'" + case TimestampType => s"'${DateTimeUtils.toJavaTimestamp(literal.value().asInstanceOf[Long]).toString}'" + case _ => literal.toString + } + } + def visitStartWith(l: String, r: String): String = { val value = r.substring(1, r.length - 1) s"`$l` LIKE '$value%'" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org