This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 6e093f97093 [improvement](load) Enable lzo & Remove dependency on 
liblzo2 (#30898)
6e093f97093 is described below

commit 6e093f97093680cd07f7bc23a80332c43723d57d
Author: HowardQin <hao....@esgyn.cn>
AuthorDate: Tue Feb 6 18:08:49 2024 +0800

    [improvement](load) Enable lzo & Remove dependency on liblzo2 (#30898)
    
    bp #30573
---
 be/CMakeLists.txt                                  |   8 -
 be/cmake/thirdparty.cmake                          |   5 -
 be/src/exec/decompressor.cpp                       |   2 -
 be/src/exec/decompressor.h                         |   7 -
 be/src/exec/lzo_decompressor.cpp                   |  52 ++-
 be/src/olap/utils.cpp                              |  11 +-
 be/src/pch/pch.h                                   |   4 -
 be/src/vec/exec/format/csv/csv_reader.cpp          |   1 +
 build.sh                                           |   5 -
 .../load_p0/stream_load/test_compress_type.out     |   4 +
 .../suites/load_p0/stream_load/ddl/basic_data.sql  |  29 ++
 .../load_p0/stream_load/test_compress_type.groovy  | 349 +++++++++++++++++++++
 12 files changed, 421 insertions(+), 56 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 92c747ddf67..129adeb3ed7 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -323,10 +323,6 @@ if (WITH_MYSQL)
     add_compile_options(-DDORIS_WITH_MYSQL)
 endif()
 
-if (WITH_LZO)
-    add_compile_options(-DDORIS_WITH_LZO)
-endif()
-
 # Enable memory tracker, which allows BE to limit the memory of tasks such as 
query, load,
 # and compaction,and observe the memory of BE through 
be_ip:http_port/MemTracker.
 # Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn 
off the memory tracker,
@@ -638,10 +634,6 @@ set(DORIS_DEPENDENCIES
     ${KRB5_LIBS}
 )
 
-if(WITH_LZO)
-    set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} lzo)
-endif()
-
 if (WITH_MYSQL)
     set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} mysql)
 endif()
diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake
index 4928b64b16b..4b6946c5a78 100644
--- a/be/cmake/thirdparty.cmake
+++ b/be/cmake/thirdparty.cmake
@@ -76,11 +76,6 @@ set_target_properties(thrift PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/
 add_library(thriftnb STATIC IMPORTED)
 set_target_properties(thriftnb PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/libthriftnb.a)
 
-if(WITH_LZO)
-    add_library(lzo STATIC IMPORTED)
-    set_target_properties(lzo PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/liblzo2.a)
-endif()
-
 if (WITH_MYSQL)
     add_library(mysql STATIC IMPORTED)
     set_target_properties(mysql PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/libmysqlclient.a)
diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp
index 1e7e3482234..c0dd7554942 100644
--- a/be/src/exec/decompressor.cpp
+++ b/be/src/exec/decompressor.cpp
@@ -50,11 +50,9 @@ Status Decompressor::create_decompressor(CompressType type, 
Decompressor** decom
     case CompressType::SNAPPYBLOCK:
         *decompressor = new SnappyBlockDecompressor();
         break;
-#ifdef DORIS_WITH_LZO
     case CompressType::LZOP:
         *decompressor = new LzopDecompressor();
         break;
-#endif
     default:
         return Status::InternalError("Unknown compress type: {}", type);
     }
diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h
index 2b07e71139f..a453a1746e4 100644
--- a/be/src/exec/decompressor.h
+++ b/be/src/exec/decompressor.h
@@ -28,11 +28,6 @@
 
 #include <string>
 
-#ifdef DORIS_WITH_LZO
-#include <lzo/lzo1x.h>
-#include <lzo/lzoconf.h>
-#endif
-
 #include "common/status.h"
 
 namespace doris {
@@ -177,7 +172,6 @@ private:
     Status init() override;
 };
 
-#ifdef DORIS_WITH_LZO
 class LzopDecompressor : public Decompressor {
 public:
     ~LzopDecompressor() override = default;
@@ -271,6 +265,5 @@ private:
     const static uint64_t F_CRC32_D;
     const static uint64_t F_ADLER32_D;
 };
-#endif // DORIS_WITH_LZO
 
 } // namespace doris
diff --git a/be/src/exec/lzo_decompressor.cpp b/be/src/exec/lzo_decompressor.cpp
index a2af9e94fd0..a39bebe8755 100644
--- a/be/src/exec/lzo_decompressor.cpp
+++ b/be/src/exec/lzo_decompressor.cpp
@@ -16,15 +16,30 @@
 // under the License.
 
 #include "exec/decompressor.h"
+#include "olap/utils.h"
+#include "orc/Exceptions.hh"
+#include "util/crc32c.h"
+
+namespace orc {
+/**
+ * Decompress the bytes in to the output buffer.
+ * @param inputAddress the start of the input
+ * @param inputLimit one past the last byte of the input
+ * @param outputAddress the start of the output buffer
+ * @param outputLimit one past the last byte of the output buffer
+ * @result the number of bytes decompressed
+ */
+uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* 
outputAddress,
+                       char* outputLimit);
+} // namespace orc
 
 namespace doris {
 
-#ifdef DORIS_WITH_LZO
 // Lzop
 const uint8_t LzopDecompressor::LZOP_MAGIC[9] = {0x89, 0x4c, 0x5a, 0x4f, 0x00,
                                                  0x0d, 0x0a, 0x1a, 0x0a};
 
-const uint64_t LzopDecompressor::LZOP_VERSION = 0x1030;
+const uint64_t LzopDecompressor::LZOP_VERSION = 0x1040;
 const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100;
 // magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1)
 // + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1)
@@ -154,18 +169,18 @@ Status LzopDecompressor::decompress(uint8_t* input, 
size_t input_len, size_t* in
         ptr += compressed_size;
     } else {
         // decompress
-        *decompressed_len = uncompressed_size;
-        int ret = lzo1x_decompress_safe(ptr, compressed_size, output,
-                                        
reinterpret_cast<lzo_uint*>(&uncompressed_size), nullptr);
-        if (ret != LZO_E_OK || uncompressed_size != *decompressed_len) {
+        try {
+            *decompressed_len =
+                    orc::lzoDecompress((const char*)ptr, (const char*)(ptr + 
compressed_size),
+                                       (char*)output, (char*)(output + 
uncompressed_size));
+        } catch (const orc::ParseError& err) {
             std::stringstream ss;
-            ss << "Lzo decompression failed with ret: " << ret
-               << " decompressed len: " << uncompressed_size << " expected: " 
<< *decompressed_len;
+            ss << "Lzo decompression failed: " << err.what();
             return Status::InternalError(ss.str());
         }
 
         RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, 
"decompressed", out_checksum,
-                                 output, uncompressed_size));
+                                 output, *decompressed_len));
         ptr += compressed_size;
     }
 
@@ -260,8 +275,14 @@ Status LzopDecompressor::parse_header_info(uint8_t* input, 
size_t input_len,
         return Status::InternalError(ss.str());
     }
 
-    // 6. skip level
-    ++ptr;
+    // 6. unsupported level: 7, 8, 9
+    uint8_t level;
+    ptr = get_uint8(ptr, &level);
+    if (level > 6) {
+        std::stringstream ss;
+        ss << "unsupported lzo level: " << (int)level;
+        return Status::InternalError(ss.str());
+    }
 
     // 7. flags
     uint32_t flags;
@@ -305,10 +326,10 @@ Status LzopDecompressor::parse_header_info(uint8_t* 
input, size_t input_len,
     uint32_t computed_checksum;
     if (_header_info.header_checksum_type == CHECK_CRC32) {
         computed_checksum = CRC32_INIT_VALUE;
-        computed_checksum = lzo_crc32(computed_checksum, header, cur - header);
+        computed_checksum = crc32c::Extend(computed_checksum, (const 
char*)header, cur - header);
     } else {
         computed_checksum = ADLER32_INIT_VALUE;
-        computed_checksum = lzo_adler32(computed_checksum, header, cur - 
header);
+        computed_checksum = olap_adler32(computed_checksum, (const 
char*)header, cur - header);
     }
 
     if (computed_checksum != expected_checksum) {
@@ -354,10 +375,10 @@ Status LzopDecompressor::checksum(LzoChecksum type, const 
std::string& source, u
     case CHECK_NONE:
         return Status::OK();
     case CHECK_CRC32:
-        computed_checksum = lzo_crc32(CRC32_INIT_VALUE, ptr, len);
+        computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const char*)ptr, 
len);
         break;
     case CHECK_ADLER:
-        computed_checksum = lzo_adler32(ADLER32_INIT_VALUE, ptr, len);
+        computed_checksum = olap_adler32(ADLER32_INIT_VALUE, (const char*)ptr, 
len);
         break;
     default:
         std::stringstream ss;
@@ -387,6 +408,5 @@ std::string LzopDecompressor::debug_info() {
        << " output checksum type: " << _header_info.output_checksum_type;
     return ss.str();
 }
-#endif // DORIS_WITH_LZO
 
 } // namespace doris
diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp
index cdd7ad4c834..59d13d5afcb 100644
--- a/be/src/olap/utils.cpp
+++ b/be/src/olap/utils.cpp
@@ -19,6 +19,7 @@
 
 // IWYU pragma: no_include <bthread/errno.h>
 #include <errno.h> // IWYU pragma: keep
+#include <stdarg.h>
 #include <time.h>
 #include <unistd.h>
 #include <zconf.h>
@@ -33,15 +34,6 @@
 #include <string>
 #include <vector>
 
-#include "util/sse_util.hpp"
-
-#ifdef DORIS_WITH_LZO
-#include <lzo/lzo1c.h>
-#include <lzo/lzo1x.h>
-#endif
-
-#include <stdarg.h>
-
 #include "common/logging.h"
 #include "common/status.h"
 #include "io/fs/file_reader.h"
@@ -49,6 +41,7 @@
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
 #include "olap/olap_common.h"
+#include "util/sse_util.hpp"
 #include "util/string_parser.hpp"
 
 namespace doris {
diff --git a/be/src/pch/pch.h b/be/src/pch/pch.h
index fffef7b8d57..9ec2a4a8531 100644
--- a/be/src/pch/pch.h
+++ b/be/src/pch/pch.h
@@ -256,10 +256,6 @@
 #include <lz4/lz4.h>
 #include <lz4/lz4frame.h>
 
-// lzo headers
-#include <lzo/lzo1x.h>
-#include <lzo/lzoconf.h>
-
 // mysql headers
 #include <mysql/mysql.h>
 
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index ced615ec66f..fb3ee5c5be3 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -561,6 +561,7 @@ Status CsvReader::_create_decompressor() {
             compress_type = CompressType::GZIP;
             break;
         case TFileCompressType::LZO:
+        case TFileCompressType::LZOP:
             compress_type = CompressType::LZOP;
             break;
         case TFileCompressType::BZ2:
diff --git a/build.sh b/build.sh
index ca7f21e1c4d..43ae8d8d2e8 100755
--- a/build.sh
+++ b/build.sh
@@ -324,9 +324,6 @@ fi
 if [[ -z "${USE_AVX2}" ]]; then
     USE_AVX2='ON'
 fi
-if [[ -z "${WITH_LZO}" ]]; then
-    WITH_LZO='OFF'
-fi
 if [[ -z "${USE_LIBCPP}" ]]; then
     if [[ "$(uname -s)" != 'Darwin' ]]; then
         USE_LIBCPP='OFF'
@@ -422,7 +419,6 @@ echo "Get params:
     PARALLEL                    -- ${PARALLEL}
     CLEAN                       -- ${CLEAN}
     WITH_MYSQL                  -- ${WITH_MYSQL}
-    WITH_LZO                    -- ${WITH_LZO}
     GLIBC_COMPATIBILITY         -- ${GLIBC_COMPATIBILITY}
     USE_AVX2                    -- ${USE_AVX2}
     USE_LIBCPP                  -- ${USE_LIBCPP}
@@ -509,7 +505,6 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then
         -DBUILD_FS_BENCHMARK="${BUILD_FS_BENCHMARK}" \
         ${CMAKE_USE_CCACHE:+${CMAKE_USE_CCACHE}} \
         -DWITH_MYSQL="${WITH_MYSQL}" \
-        -DWITH_LZO="${WITH_LZO}" \
         -DUSE_LIBCPP="${USE_LIBCPP}" \
         -DBUILD_META_TOOL="${BUILD_META_TOOL}" \
         -DBUILD_INDEX_TOOL="${BUILD_INDEX_TOOL}" \
diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out 
b/regression-test/data/load_p0/stream_load/test_compress_type.out
new file mode 100644
index 00000000000..56d195c569e
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_compress_type.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+160
+
diff --git a/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql 
b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
new file mode 100644
index 00000000000..41c3660e11c
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
@@ -0,0 +1,29 @@
+CREATE TABLE basic_data
+(
+    k00 INT             NOT NULL,
+    k01 DATE            NOT NULL,
+    k02 BOOLEAN         NULL,
+    k03 TINYINT         NULL,
+    k04 SMALLINT        NULL,
+    k05 INT             NULL,
+    k06 BIGINT          NULL,
+    k07 LARGEINT        NULL,
+    k08 FLOAT           NULL,
+    k09 DOUBLE          NULL,
+    k10 DECIMAL(9,1)    NULL,
+    k11 DECIMALV3(9,1)  NULL,
+    k12 DATETIME        NULL,
+    k13 DATEV2          NULL,
+    k14 DATETIMEV2      NULL,
+    k15 CHAR            NULL,
+    k16 VARCHAR         NULL,
+    k17 STRING          NULL,
+    k18 JSON            NULL
+
+)
+DUPLICATE KEY(k00)
+DISTRIBUTED BY HASH(k00) BUCKETS 32
+PROPERTIES (
+    "bloom_filter_columns"="k05",
+    "replication_num" = "1"
+);
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy 
b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
new file mode 100644
index 00000000000..950f1fdeb27
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_compress_type", "load_p0") {
+    def tableName = "basic_data"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    // GZ/LZO/BZ2/LZ4FRAME/LZOP
+    sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        set 'compress_type', 'GZ'
+
+        file "basic_data.csv.gz"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        set 'compress_type', 'BZ2'
+
+        file "basic_data.csv.bz2"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', 'csv'
+        set 'compress_type', 'LZ4'
+
+        file "basic_data.csv.lz4"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'GZ'
+
+        file "basic_data.csv.gz"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'BZ2'
+
+        file "basic_data.csv.bz2"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'LZ4'
+
+        file "basic_data.csv.lz4"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }       
+    }
+
+    // LZO = LZOP
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'LZO'
+
+        file "basic_data.csv.lzo"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'LZOP'
+
+        file "basic_data.csv.lzo"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    // no compress_type
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        file "basic_data.csv.gz"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(13, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(13, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    // no compress_type
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        file "basic_data.csv.bz2"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(9, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(9, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    // no compress_type
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        file "basic_data.csv.lz4"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(31, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(31, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    // no compress_type
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        file "basic_data.csv.gz"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(13, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(13, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    // no compress_type
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        file "basic_data.csv.bz2"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(9, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(9, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    // no compress_type
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        file "basic_data.csv.lz4"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(31, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(31, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    // no compress_type
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        file "basic_data.csv.lzo"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(23, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(23, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    qt_sql """ select count(*) from ${tableName} """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to