This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 6e093f97093 [improvement](load) Enable lzo & Remove dependency on liblzo2 (#30898) 6e093f97093 is described below commit 6e093f97093680cd07f7bc23a80332c43723d57d Author: HowardQin <hao....@esgyn.cn> AuthorDate: Tue Feb 6 18:08:49 2024 +0800 [improvement](load) Enable lzo & Remove dependency on liblzo2 (#30898) bp #30573 --- be/CMakeLists.txt | 8 - be/cmake/thirdparty.cmake | 5 - be/src/exec/decompressor.cpp | 2 - be/src/exec/decompressor.h | 7 - be/src/exec/lzo_decompressor.cpp | 52 ++- be/src/olap/utils.cpp | 11 +- be/src/pch/pch.h | 4 - be/src/vec/exec/format/csv/csv_reader.cpp | 1 + build.sh | 5 - .../load_p0/stream_load/test_compress_type.out | 4 + .../suites/load_p0/stream_load/ddl/basic_data.sql | 29 ++ .../load_p0/stream_load/test_compress_type.groovy | 349 +++++++++++++++++++++ 12 files changed, 421 insertions(+), 56 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 92c747ddf67..129adeb3ed7 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -323,10 +323,6 @@ if (WITH_MYSQL) add_compile_options(-DDORIS_WITH_MYSQL) endif() -if (WITH_LZO) - add_compile_options(-DDORIS_WITH_LZO) -endif() - # Enable memory tracker, which allows BE to limit the memory of tasks such as query, load, # and compaction,and observe the memory of BE through be_ip:http_port/MemTracker. # Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn off the memory tracker, @@ -638,10 +634,6 @@ set(DORIS_DEPENDENCIES ${KRB5_LIBS} ) -if(WITH_LZO) - set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} lzo) -endif() - if (WITH_MYSQL) set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} mysql) endif() diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake index 4928b64b16b..4b6946c5a78 100644 --- a/be/cmake/thirdparty.cmake +++ b/be/cmake/thirdparty.cmake @@ -76,11 +76,6 @@ set_target_properties(thrift PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/ add_library(thriftnb STATIC IMPORTED) set_target_properties(thriftnb PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libthriftnb.a) -if(WITH_LZO) - add_library(lzo STATIC IMPORTED) - set_target_properties(lzo PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/liblzo2.a) -endif() - if (WITH_MYSQL) add_library(mysql STATIC IMPORTED) set_target_properties(mysql PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libmysqlclient.a) diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index 1e7e3482234..c0dd7554942 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -50,11 +50,9 @@ Status Decompressor::create_decompressor(CompressType type, Decompressor** decom case CompressType::SNAPPYBLOCK: *decompressor = new SnappyBlockDecompressor(); break; -#ifdef DORIS_WITH_LZO case CompressType::LZOP: *decompressor = new LzopDecompressor(); break; -#endif default: return Status::InternalError("Unknown compress type: {}", type); } diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h index 2b07e71139f..a453a1746e4 100644 --- a/be/src/exec/decompressor.h +++ b/be/src/exec/decompressor.h @@ -28,11 +28,6 @@ #include <string> -#ifdef DORIS_WITH_LZO -#include <lzo/lzo1x.h> -#include <lzo/lzoconf.h> -#endif - #include "common/status.h" namespace doris { @@ -177,7 +172,6 @@ private: Status init() override; }; -#ifdef DORIS_WITH_LZO class LzopDecompressor : public Decompressor { public: ~LzopDecompressor() override = default; @@ -271,6 +265,5 @@ private: const static uint64_t F_CRC32_D; const static uint64_t F_ADLER32_D; }; -#endif // DORIS_WITH_LZO } // namespace doris diff --git a/be/src/exec/lzo_decompressor.cpp b/be/src/exec/lzo_decompressor.cpp index a2af9e94fd0..a39bebe8755 100644 --- a/be/src/exec/lzo_decompressor.cpp +++ b/be/src/exec/lzo_decompressor.cpp @@ -16,15 +16,30 @@ // under the License. #include "exec/decompressor.h" +#include "olap/utils.h" +#include "orc/Exceptions.hh" +#include "util/crc32c.h" + +namespace orc { +/** + * Decompress the bytes in to the output buffer. + * @param inputAddress the start of the input + * @param inputLimit one past the last byte of the input + * @param outputAddress the start of the output buffer + * @param outputLimit one past the last byte of the output buffer + * @result the number of bytes decompressed + */ +uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress, + char* outputLimit); +} // namespace orc namespace doris { -#ifdef DORIS_WITH_LZO // Lzop const uint8_t LzopDecompressor::LZOP_MAGIC[9] = {0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a}; -const uint64_t LzopDecompressor::LZOP_VERSION = 0x1030; +const uint64_t LzopDecompressor::LZOP_VERSION = 0x1040; const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100; // magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1) // + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1) @@ -154,18 +169,18 @@ Status LzopDecompressor::decompress(uint8_t* input, size_t input_len, size_t* in ptr += compressed_size; } else { // decompress - *decompressed_len = uncompressed_size; - int ret = lzo1x_decompress_safe(ptr, compressed_size, output, - reinterpret_cast<lzo_uint*>(&uncompressed_size), nullptr); - if (ret != LZO_E_OK || uncompressed_size != *decompressed_len) { + try { + *decompressed_len = + orc::lzoDecompress((const char*)ptr, (const char*)(ptr + compressed_size), + (char*)output, (char*)(output + uncompressed_size)); + } catch (const orc::ParseError& err) { std::stringstream ss; - ss << "Lzo decompression failed with ret: " << ret - << " decompressed len: " << uncompressed_size << " expected: " << *decompressed_len; + ss << "Lzo decompression failed: " << err.what(); return Status::InternalError(ss.str()); } RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, "decompressed", out_checksum, - output, uncompressed_size)); + output, *decompressed_len)); ptr += compressed_size; } @@ -260,8 +275,14 @@ Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len, return Status::InternalError(ss.str()); } - // 6. skip level - ++ptr; + // 6. unsupported level: 7, 8, 9 + uint8_t level; + ptr = get_uint8(ptr, &level); + if (level > 6) { + std::stringstream ss; + ss << "unsupported lzo level: " << (int)level; + return Status::InternalError(ss.str()); + } // 7. flags uint32_t flags; @@ -305,10 +326,10 @@ Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len, uint32_t computed_checksum; if (_header_info.header_checksum_type == CHECK_CRC32) { computed_checksum = CRC32_INIT_VALUE; - computed_checksum = lzo_crc32(computed_checksum, header, cur - header); + computed_checksum = crc32c::Extend(computed_checksum, (const char*)header, cur - header); } else { computed_checksum = ADLER32_INIT_VALUE; - computed_checksum = lzo_adler32(computed_checksum, header, cur - header); + computed_checksum = olap_adler32(computed_checksum, (const char*)header, cur - header); } if (computed_checksum != expected_checksum) { @@ -354,10 +375,10 @@ Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source, u case CHECK_NONE: return Status::OK(); case CHECK_CRC32: - computed_checksum = lzo_crc32(CRC32_INIT_VALUE, ptr, len); + computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const char*)ptr, len); break; case CHECK_ADLER: - computed_checksum = lzo_adler32(ADLER32_INIT_VALUE, ptr, len); + computed_checksum = olap_adler32(ADLER32_INIT_VALUE, (const char*)ptr, len); break; default: std::stringstream ss; @@ -387,6 +408,5 @@ std::string LzopDecompressor::debug_info() { << " output checksum type: " << _header_info.output_checksum_type; return ss.str(); } -#endif // DORIS_WITH_LZO } // namespace doris diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp index cdd7ad4c834..59d13d5afcb 100644 --- a/be/src/olap/utils.cpp +++ b/be/src/olap/utils.cpp @@ -19,6 +19,7 @@ // IWYU pragma: no_include <bthread/errno.h> #include <errno.h> // IWYU pragma: keep +#include <stdarg.h> #include <time.h> #include <unistd.h> #include <zconf.h> @@ -33,15 +34,6 @@ #include <string> #include <vector> -#include "util/sse_util.hpp" - -#ifdef DORIS_WITH_LZO -#include <lzo/lzo1c.h> -#include <lzo/lzo1x.h> -#endif - -#include <stdarg.h> - #include "common/logging.h" #include "common/status.h" #include "io/fs/file_reader.h" @@ -49,6 +41,7 @@ #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "olap/olap_common.h" +#include "util/sse_util.hpp" #include "util/string_parser.hpp" namespace doris { diff --git a/be/src/pch/pch.h b/be/src/pch/pch.h index fffef7b8d57..9ec2a4a8531 100644 --- a/be/src/pch/pch.h +++ b/be/src/pch/pch.h @@ -256,10 +256,6 @@ #include <lz4/lz4.h> #include <lz4/lz4frame.h> -// lzo headers -#include <lzo/lzo1x.h> -#include <lzo/lzoconf.h> - // mysql headers #include <mysql/mysql.h> diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index ced615ec66f..fb3ee5c5be3 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -561,6 +561,7 @@ Status CsvReader::_create_decompressor() { compress_type = CompressType::GZIP; break; case TFileCompressType::LZO: + case TFileCompressType::LZOP: compress_type = CompressType::LZOP; break; case TFileCompressType::BZ2: diff --git a/build.sh b/build.sh index ca7f21e1c4d..43ae8d8d2e8 100755 --- a/build.sh +++ b/build.sh @@ -324,9 +324,6 @@ fi if [[ -z "${USE_AVX2}" ]]; then USE_AVX2='ON' fi -if [[ -z "${WITH_LZO}" ]]; then - WITH_LZO='OFF' -fi if [[ -z "${USE_LIBCPP}" ]]; then if [[ "$(uname -s)" != 'Darwin' ]]; then USE_LIBCPP='OFF' @@ -422,7 +419,6 @@ echo "Get params: PARALLEL -- ${PARALLEL} CLEAN -- ${CLEAN} WITH_MYSQL -- ${WITH_MYSQL} - WITH_LZO -- ${WITH_LZO} GLIBC_COMPATIBILITY -- ${GLIBC_COMPATIBILITY} USE_AVX2 -- ${USE_AVX2} USE_LIBCPP -- ${USE_LIBCPP} @@ -509,7 +505,6 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then -DBUILD_FS_BENCHMARK="${BUILD_FS_BENCHMARK}" \ ${CMAKE_USE_CCACHE:+${CMAKE_USE_CCACHE}} \ -DWITH_MYSQL="${WITH_MYSQL}" \ - -DWITH_LZO="${WITH_LZO}" \ -DUSE_LIBCPP="${USE_LIBCPP}" \ -DBUILD_META_TOOL="${BUILD_META_TOOL}" \ -DBUILD_INDEX_TOOL="${BUILD_INDEX_TOOL}" \ diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out b/regression-test/data/load_p0/stream_load/test_compress_type.out new file mode 100644 index 00000000000..56d195c569e --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_compress_type.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +160 + diff --git a/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql new file mode 100644 index 00000000000..41c3660e11c --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql @@ -0,0 +1,29 @@ +CREATE TABLE basic_data +( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL + +) +DUPLICATE KEY(k00) +DISTRIBUTED BY HASH(k00) BUCKETS 32 +PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy new file mode 100644 index 00000000000..950f1fdeb27 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy @@ -0,0 +1,349 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_compress_type", "load_p0") { + def tableName = "basic_data" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + // GZ/LZO/BZ2/LZ4FRAME/LZOP + sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + set 'compress_type', 'GZ' + + file "basic_data.csv.gz" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + set 'compress_type', 'BZ2' + + file "basic_data.csv.bz2" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', 'csv' + set 'compress_type', 'LZ4' + + file "basic_data.csv.lz4" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'GZ' + + file "basic_data.csv.gz" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'BZ2' + + file "basic_data.csv.bz2" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'LZ4' + + file "basic_data.csv.lz4" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + // LZO = LZOP + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'LZO' + + file "basic_data.csv.lzo" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'LZOP' + + file "basic_data.csv.lzo" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + // no compress_type + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + file "basic_data.csv.gz" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(13, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(13, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + // no compress_type + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + file "basic_data.csv.bz2" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(9, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(9, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + // no compress_type + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + file "basic_data.csv.lz4" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(31, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(31, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + // no compress_type + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + file "basic_data.csv.gz" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(13, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(13, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + // no compress_type + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + file "basic_data.csv.bz2" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(9, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(9, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + // no compress_type + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + file "basic_data.csv.lz4" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(31, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(31, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + // no compress_type + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + file "basic_data.csv.lzo" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(23, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(23, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + qt_sql """ select count(*) from ${tableName} """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org