This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0d32aeeaf6b24ee4b219e3251ad38c4e8938acb4 Author: HowardQin <[email protected]> AuthorDate: Mon Feb 5 17:35:00 2024 +0800 [improvement](load) Enable lzo & Remove dependency on Markus F.X.J. Oberhumer's lzo library (#30573) Issue Number: close #29406 1. increase lzop version to 0x1040, I set to 0x1040 only for decompressing lzo files compressed by higher version of lzop, no change of decompressing logic, actully, 0x1040 should have "F_H_FILTER" feature, but it mainly for audio and image data, so we do not support it. 2. use orc::lzoDecompress() instead of lzo1x_decompress_safe() to decompress lzo data 3. use crc32c::Extend() instead of lzo_crc32() 4. use olap_adler32() instead of lzo_adler32() 5. thus, remove dependency of Markus F.X.J. Oberhumer's lzo library 6. remove DORIS_WITH_LZO, so lzo file are supported by stream and broker load by default 7. add some regression test --- be/CMakeLists.txt | 8 --- be/cmake/thirdparty.cmake | 4 -- be/src/exec/decompressor.cpp | 2 - be/src/exec/decompressor.h | 7 -- be/src/exec/lzo_decompressor.cpp | 53 +++++++++++----- be/src/olap/utils.cpp | 11 +--- be/src/pch/pch.h | 4 -- be/src/vec/exec/format/csv/csv_reader.cpp | 1 + build.sh | 5 -- .../load_p0/stream_load/test_compress_type.out | 2 +- .../load_p0/stream_load/test_compress_type.groovy | 74 +++++++++++++++++++++- 11 files changed, 112 insertions(+), 59 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index bba09ebc2a6..9b72bc61777 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -323,10 +323,6 @@ if (WITH_MYSQL) add_compile_options(-DDORIS_WITH_MYSQL) endif() -if (WITH_LZO) - add_compile_options(-DDORIS_WITH_LZO) -endif() - # Enable memory tracker, which allows BE to limit the memory of tasks such as query, load, # and compaction,and observe the memory of BE through be_ip:http_port/MemTracker. # Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn off the memory tracker, @@ -547,10 +543,6 @@ set(DORIS_DEPENDENCIES ${KRB5_LIBS} ) -if(WITH_LZO) - set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} lzo) -endif() - if (WITH_MYSQL) set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} mysql) endif() diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake index 5c07f79d6e4..3dca9252faa 100644 --- a/be/cmake/thirdparty.cmake +++ b/be/cmake/thirdparty.cmake @@ -70,10 +70,6 @@ add_thirdparty(lz4) add_thirdparty(thrift) add_thirdparty(thriftnb) -if(WITH_LZO) - add_thirdparty(lzo LIBNAME "lib/liblzo2.a") -endif() - add_thirdparty(libevent LIBNAME "lib/libevent.a") add_thirdparty(libevent_pthreads LIBNAME "lib/libevent_pthreads.a") add_thirdparty(libbz2 LIBNAME "lib/libbz2.a") diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index 1e7e3482234..c0dd7554942 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -50,11 +50,9 @@ Status Decompressor::create_decompressor(CompressType type, Decompressor** decom case CompressType::SNAPPYBLOCK: *decompressor = new SnappyBlockDecompressor(); break; -#ifdef DORIS_WITH_LZO case CompressType::LZOP: *decompressor = new LzopDecompressor(); break; -#endif default: return Status::InternalError("Unknown compress type: {}", type); } diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h index 713c5db2bf1..45653fd8b04 100644 --- a/be/src/exec/decompressor.h +++ b/be/src/exec/decompressor.h @@ -28,11 +28,6 @@ #include <string> -#ifdef DORIS_WITH_LZO -#include <lzo/lzo1x.h> -#include <lzo/lzoconf.h> -#endif - #include "common/status.h" namespace doris { @@ -177,7 +172,6 @@ private: Status init() override; }; -#ifdef DORIS_WITH_LZO class LzopDecompressor : public Decompressor { public: ~LzopDecompressor() override = default; @@ -271,6 +265,5 @@ private: const static uint64_t F_CRC32_D; const static uint64_t F_ADLER32_D; }; -#endif // DORIS_WITH_LZO } // namespace doris diff --git a/be/src/exec/lzo_decompressor.cpp b/be/src/exec/lzo_decompressor.cpp index a2af9e94fd0..c8cf0499508 100644 --- a/be/src/exec/lzo_decompressor.cpp +++ b/be/src/exec/lzo_decompressor.cpp @@ -16,15 +16,30 @@ // under the License. #include "exec/decompressor.h" +#include "olap/utils.h" +#include "orc/Exceptions.hh" +#include "util/crc32c.h" + +namespace orc { +/** + * Decompress the bytes in to the output buffer. + * @param inputAddress the start of the input + * @param inputLimit one past the last byte of the input + * @param outputAddress the start of the output buffer + * @param outputLimit one past the last byte of the output buffer + * @result the number of bytes decompressed + */ +uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress, + char* outputLimit); +} // namespace orc namespace doris { -#ifdef DORIS_WITH_LZO // Lzop const uint8_t LzopDecompressor::LZOP_MAGIC[9] = {0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a}; -const uint64_t LzopDecompressor::LZOP_VERSION = 0x1030; +const uint64_t LzopDecompressor::LZOP_VERSION = 0x1040; const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100; // magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1) // + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1) @@ -153,19 +168,18 @@ Status LzopDecompressor::decompress(uint8_t* input, size_t input_len, size_t* in memmove(output, ptr, compressed_size); ptr += compressed_size; } else { - // decompress - *decompressed_len = uncompressed_size; - int ret = lzo1x_decompress_safe(ptr, compressed_size, output, - reinterpret_cast<lzo_uint*>(&uncompressed_size), nullptr); - if (ret != LZO_E_OK || uncompressed_size != *decompressed_len) { + try { + *decompressed_len = + orc::lzoDecompress((const char*)ptr, (const char*)(ptr + compressed_size), + (char*)output, (char*)(output + uncompressed_size)); + } catch (const orc::ParseError& err) { std::stringstream ss; - ss << "Lzo decompression failed with ret: " << ret - << " decompressed len: " << uncompressed_size << " expected: " << *decompressed_len; + ss << "Lzo decompression failed: " << err.what(); return Status::InternalError(ss.str()); } RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, "decompressed", out_checksum, - output, uncompressed_size)); + output, *decompressed_len)); ptr += compressed_size; } @@ -260,8 +274,14 @@ Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len, return Status::InternalError(ss.str()); } - // 6. skip level - ++ptr; + // 6. unsupported level: 7, 8, 9 + uint8_t level; + ptr = get_uint8(ptr, &level); + if (level > 6) { + std::stringstream ss; + ss << "unsupported lzo level: " << level; + return Status::InternalError(ss.str()); + } // 7. flags uint32_t flags; @@ -305,10 +325,10 @@ Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len, uint32_t computed_checksum; if (_header_info.header_checksum_type == CHECK_CRC32) { computed_checksum = CRC32_INIT_VALUE; - computed_checksum = lzo_crc32(computed_checksum, header, cur - header); + computed_checksum = crc32c::Extend(computed_checksum, (const char*)header, cur - header); } else { computed_checksum = ADLER32_INIT_VALUE; - computed_checksum = lzo_adler32(computed_checksum, header, cur - header); + computed_checksum = olap_adler32(computed_checksum, (const char*)header, cur - header); } if (computed_checksum != expected_checksum) { @@ -354,10 +374,10 @@ Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source, u case CHECK_NONE: return Status::OK(); case CHECK_CRC32: - computed_checksum = lzo_crc32(CRC32_INIT_VALUE, ptr, len); + computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const char*)ptr, len); break; case CHECK_ADLER: - computed_checksum = lzo_adler32(ADLER32_INIT_VALUE, ptr, len); + computed_checksum = olap_adler32(ADLER32_INIT_VALUE, (const char*)ptr, len); break; default: std::stringstream ss; @@ -387,6 +407,5 @@ std::string LzopDecompressor::debug_info() { << " output checksum type: " << _header_info.output_checksum_type; return ss.str(); } -#endif // DORIS_WITH_LZO } // namespace doris diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp index e6958343c17..49c1d53ae35 100644 --- a/be/src/olap/utils.cpp +++ b/be/src/olap/utils.cpp @@ -19,6 +19,7 @@ // IWYU pragma: no_include <bthread/errno.h> #include <errno.h> // IWYU pragma: keep +#include <stdarg.h> #include <time.h> #include <unistd.h> #include <zconf.h> @@ -33,21 +34,13 @@ #include <string> #include <vector> -#include "util/sse_util.hpp" - -#ifdef DORIS_WITH_LZO -#include <lzo/lzo1c.h> -#include <lzo/lzo1x.h> -#endif - -#include <stdarg.h> - #include "common/logging.h" #include "common/status.h" #include "io/fs/file_reader.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "olap/olap_common.h" +#include "util/sse_util.hpp" #include "util/string_parser.hpp" #include "vec/runtime/ipv4_value.h" #include "vec/runtime/ipv6_value.h" diff --git a/be/src/pch/pch.h b/be/src/pch/pch.h index 34428d739d0..8c7ef3ea3a9 100644 --- a/be/src/pch/pch.h +++ b/be/src/pch/pch.h @@ -255,10 +255,6 @@ #include <lz4/lz4.h> #include <lz4/lz4frame.h> -// lzo headers -#include <lzo/lzo1x.h> -#include <lzo/lzoconf.h> - // mysql headers #include <mysql/mysql.h> diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 9f8c01b10db..84c8d911f94 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -559,6 +559,7 @@ Status CsvReader::_create_decompressor() { compress_type = CompressType::GZIP; break; case TFileCompressType::LZO: + case TFileCompressType::LZOP: compress_type = CompressType::LZOP; break; case TFileCompressType::BZ2: diff --git a/build.sh b/build.sh index 3b819929d00..8363705b555 100755 --- a/build.sh +++ b/build.sh @@ -327,9 +327,6 @@ fi if [[ -z "${USE_AVX2}" ]]; then USE_AVX2='ON' fi -if [[ -z "${WITH_LZO}" ]]; then - WITH_LZO='OFF' -fi if [[ -z "${USE_LIBCPP}" ]]; then if [[ "$(uname -s)" != 'Darwin' ]]; then USE_LIBCPP='OFF' @@ -425,7 +422,6 @@ echo "Get params: PARALLEL -- ${PARALLEL} CLEAN -- ${CLEAN} WITH_MYSQL -- ${WITH_MYSQL} - WITH_LZO -- ${WITH_LZO} GLIBC_COMPATIBILITY -- ${GLIBC_COMPATIBILITY} USE_AVX2 -- ${USE_AVX2} USE_LIBCPP -- ${USE_LIBCPP} @@ -514,7 +510,6 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then -DBUILD_FS_BENCHMARK="${BUILD_FS_BENCHMARK}" \ ${CMAKE_USE_CCACHE:+${CMAKE_USE_CCACHE}} \ -DWITH_MYSQL="${WITH_MYSQL}" \ - -DWITH_LZO="${WITH_LZO}" \ -DUSE_LIBCPP="${USE_LIBCPP}" \ -DBUILD_META_TOOL="${BUILD_META_TOOL}" \ -DBUILD_INDEX_TOOL="${BUILD_INDEX_TOOL}" \ diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out b/regression-test/data/load_p0/stream_load/test_compress_type.out index f76aa4d7415..56d195c569e 100644 --- a/regression-test/data/load_p0/stream_load/test_compress_type.out +++ b/regression-test/data/load_p0/stream_load/test_compress_type.out @@ -1,4 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -120 +160 diff --git a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy index d1123027bac..eeab7e80975 100644 --- a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy +++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_compress_type", "load_p0") { +suite("test_stream_load_compress_type", "load_p0") { def tableName = "basic_data" sql """ DROP TABLE IF EXISTS ${tableName} """ @@ -153,6 +153,50 @@ suite("test_compress_type", "load_p0") { } } + // LZO = LZOP + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'LZO' + + file "basic_data.csv.lzo" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'LZOP' + + file "basic_data.csv.lzo" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + // no compress_type streamLoad { table "${tableName}" set 'column_separator', '|' @@ -174,6 +218,7 @@ suite("test_compress_type", "load_p0") { } } + // no compress_type streamLoad { table "${tableName}" set 'column_separator', '|' @@ -195,6 +240,7 @@ suite("test_compress_type", "load_p0") { } } + // no compress_type streamLoad { table "${tableName}" set 'column_separator', '|' @@ -216,6 +262,7 @@ suite("test_compress_type", "load_p0") { } } + // no compress_type streamLoad { table "${tableName}" set 'column_separator', '|' @@ -236,6 +283,7 @@ suite("test_compress_type", "load_p0") { } } + // no compress_type streamLoad { table "${tableName}" set 'column_separator', '|' @@ -256,6 +304,7 @@ suite("test_compress_type", "load_p0") { } } + // no compress_type streamLoad { table "${tableName}" set 'column_separator', '|' @@ -276,5 +325,26 @@ suite("test_compress_type", "load_p0") { } } + // no compress_type + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + file "basic_data.csv.lzo" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(23, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(23, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + qt_sql """ select count(*) from ${tableName} """ -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
