This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 2aa9cb22768 branch-2.1: [fix](lzo) fix lzo decompression failed #49538 (#49634) 2aa9cb22768 is described below commit 2aa9cb22768fd529f2ad482a8a630fe02be000ef Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Apr 17 09:19:35 2025 +0800 branch-2.1: [fix](lzo) fix lzo decompression failed #49538 (#49634) Cherry-picked from #49538 --------- Co-authored-by: Mingyu Chen (Rayner) <morning...@163.com> Co-authored-by: morningman <yun...@selectdb.com> --- be/src/exec/lzo_decompressor.cpp | 18 +- .../file_reader/new_plain_text_line_reader.cpp | 13 +- .../external_table_p0/tvf/lzo/test_compress.lzo | Bin 0 -> 256 bytes .../test_no_compress_with_empty_block_begin.lzo | Bin 0 -> 200 bytes .../lzo/test_no_compress_with_empty_block_end.lzo | Bin 0 -> 196 bytes .../test_no_compress_with_empty_block_middle.lzo | Bin 0 -> 188 bytes .../external_table_p0/tvf/test_local_tvf_lzo.out | Bin 0 -> 899 bytes .../tvf/test_local_tvf_lzo.groovy | 58 +++++ tools/lzo/README.md | 37 +++ tools/lzo/build.sh | 20 ++ tools/lzo/lzo_writer.cpp | 282 +++++++++++++++++++++ 11 files changed, 407 insertions(+), 21 deletions(-) diff --git a/be/src/exec/lzo_decompressor.cpp b/be/src/exec/lzo_decompressor.cpp index b240e2995a0..a8262ea6e3d 100644 --- a/be/src/exec/lzo_decompressor.cpp +++ b/be/src/exec/lzo_decompressor.cpp @@ -80,8 +80,6 @@ Status LzopDecompressor::decompress(uint8_t* input, size_t input_len, size_t* in } } - // LOG(INFO) << "after load header: " << *input_bytes_read; - // read compressed block // compressed-block ::= // <uncompressed-size> @@ -144,7 +142,7 @@ Status LzopDecompressor::decompress(uint8_t* input, size_t input_len, size_t* in return Status::OK(); } - ptr = get_uint32(ptr, &out_checksum); + ptr = get_uint32(ptr, &in_checksum); left_input_len -= sizeof(uint32_t); } else { // If the compressed data size is equal to the uncompressed data size, then @@ -185,16 +183,7 @@ Status LzopDecompressor::decompress(uint8_t* input, size_t input_len, size_t* in ptr += compressed_size; } - // 7. peek next block's uncompressed size - uint32_t next_uncompressed_size; - get_uint32(ptr, &next_uncompressed_size); - if (next_uncompressed_size == 0) { - // 0 means current block is the last block. - // consume this uncompressed_size to finish reading. - ptr += sizeof(uint32_t); - } - - // 8. done + // 7. done *stream_end = true; *decompressed_len = uncompressed_size; *input_bytes_read += ptr - block_start; @@ -202,8 +191,7 @@ Status LzopDecompressor::decompress(uint8_t* input, size_t input_len, size_t* in VLOG_DEBUG << "finished decompress lzo block." << " compressed_size: " << compressed_size << " decompressed_len: " << *decompressed_len - << " input_bytes_read: " << *input_bytes_read - << " next_uncompressed_size: " << next_uncompressed_size; + << " input_bytes_read: " << *input_bytes_read; return Status::OK(); } diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index ad86cca212b..463164d7349 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -30,6 +30,7 @@ #include <memory> #include <ostream> +#include "common/logging.h" #include "exec/decompressor.h" #include "io/fs/file_reader.h" #include "util/slice.h" @@ -426,12 +427,12 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool _output_buf_size - _output_buf_limit, /* output_max_len */ &decompressed_len, &stream_end, &_more_input_bytes, &_more_output_bytes)); - // LOG(INFO) << "after decompress:" - // << " stream_end: " << stream_end - // << " input_read_bytes: " << input_read_bytes - // << " decompressed_len: " << decompressed_len - // << " more_input_bytes: " << _more_input_bytes - // << " more_output_bytes: " << _more_output_bytes; + VLOG_DEBUG << "after decompress:" + << " stream_end: " << stream_end + << " input_read_bytes: " << input_read_bytes + << " decompressed_len: " << decompressed_len + << " more_input_bytes: " << _more_input_bytes + << " more_output_bytes: " << _more_output_bytes; // update pos and limit _input_buf_pos += input_read_bytes; diff --git a/regression-test/data/external_table_p0/tvf/lzo/test_compress.lzo b/regression-test/data/external_table_p0/tvf/lzo/test_compress.lzo new file mode 100644 index 00000000000..c762903cffc Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/lzo/test_compress.lzo differ diff --git a/regression-test/data/external_table_p0/tvf/lzo/test_no_compress_with_empty_block_begin.lzo b/regression-test/data/external_table_p0/tvf/lzo/test_no_compress_with_empty_block_begin.lzo new file mode 100644 index 00000000000..16b570fb440 Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/lzo/test_no_compress_with_empty_block_begin.lzo differ diff --git a/regression-test/data/external_table_p0/tvf/lzo/test_no_compress_with_empty_block_end.lzo b/regression-test/data/external_table_p0/tvf/lzo/test_no_compress_with_empty_block_end.lzo new file mode 100644 index 00000000000..1c4bb0bbacd Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/lzo/test_no_compress_with_empty_block_end.lzo differ diff --git a/regression-test/data/external_table_p0/tvf/lzo/test_no_compress_with_empty_block_middle.lzo b/regression-test/data/external_table_p0/tvf/lzo/test_no_compress_with_empty_block_middle.lzo new file mode 100644 index 00000000000..bf9e914e41c Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/lzo/test_no_compress_with_empty_block_middle.lzo differ diff --git a/regression-test/data/external_table_p0/tvf/test_local_tvf_lzo.out b/regression-test/data/external_table_p0/tvf/test_local_tvf_lzo.out new file mode 100644 index 00000000000..bab8e156267 Binary files /dev/null and b/regression-test/data/external_table_p0/tvf/test_local_tvf_lzo.out differ diff --git a/regression-test/suites/external_table_p0/tvf/test_local_tvf_lzo.groovy b/regression-test/suites/external_table_p0/tvf/test_local_tvf_lzo.groovy new file mode 100644 index 00000000000..a7eba83a970 --- /dev/null +++ b/regression-test/suites/external_table_p0/tvf/test_local_tvf_lzo.groovy @@ -0,0 +1,58 @@ +import org.junit.Assert + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This suit test the `backends` tvf +suite("test_local_tvf_lzo", "p0,external,external_docker") { + List<List<Object>> backends = sql """ show backends """ + def dataFilePath = context.config.dataPath + "/external_table_p0/tvf/lzo" + + assertTrue(backends.size() > 0) + + def be_id = backends[0][0] + // cluster mode need to make sure all be has this data + def outFilePath="/" + def transFile01="${dataFilePath}/test_compress.lzo" + def transFile02="${dataFilePath}/test_no_compress_with_empty_block_begin.lzo" + def transFile03="${dataFilePath}/test_no_compress_with_empty_block_end.lzo" + def transFile04="${dataFilePath}/test_no_compress_with_empty_block_middle.lzo" + + for (List<Object> backend : backends) { + def be_host = backend[1] + scpFiles ("root", be_host, transFile01, outFilePath, false); + scpFiles ("root", be_host, transFile02, outFilePath, false); + scpFiles ("root", be_host, transFile03, outFilePath, false); + scpFiles ("root", be_host, transFile04, outFilePath, false); + } + + def file1 = outFilePath + "test_compress.lzo"; + def file2 = outFilePath + "test_no_compress_with_empty_block_begin.lzo"; + def file3 = outFilePath + "test_no_compress_with_empty_block_end.lzo"; + def file4 = outFilePath + "test_no_compress_with_empty_block_middle.lzo"; + + order_qt_test_1 """ select * from local( "file_path" = "${file1}", "backend_id" = "${be_id}", "format" = "csv");""" + order_qt_test_2 """ select * from local( "file_path" = "${file2}", "backend_id" = "${be_id}", "format" = "csv");""" + order_qt_test_3 """ select * from local( "file_path" = "${file3}", "backend_id" = "${be_id}", "format" = "csv");""" + order_qt_test_4 """ select * from local( "file_path" = "${file4}", "backend_id" = "${be_id}", "format" = "csv");""" + + qt_test_5 """ select count(*) from local( "file_path" = "${file1}", "backend_id" = "${be_id}", "format" = "csv");""" + qt_test_6 """ select count(*) from local( "file_path" = "${file2}", "backend_id" = "${be_id}", "format" = "csv");""" + qt_test_7 """ select count(*) from local( "file_path" = "${file3}", "backend_id" = "${be_id}", "format" = "csv");""" + qt_test_8 """ select count(*) from local( "file_path" = "${file4}", "backend_id" = "${be_id}", "format" = "csv");""" + +} diff --git a/tools/lzo/README.md b/tools/lzo/README.md new file mode 100644 index 00000000000..faca8825410 --- /dev/null +++ b/tools/lzo/README.md @@ -0,0 +1,37 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# LZO Writer + +This is simple LZO writer, you can use it to generate lzo compressed file. + +See `main()` for details + +## Compilation + +``` +sh build.sh +``` + +## Generate file + +``` +./lzo_writer file_name.lzo +``` + diff --git a/tools/lzo/build.sh b/tools/lzo/build.sh new file mode 100644 index 00000000000..82c3206b13e --- /dev/null +++ b/tools/lzo/build.sh @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +export DORIS_THIRDPARTY=${ROOT}/../../thirdparty/ +g++ -o lzo_writer lzo_writer.cpp -I. -Isrc -I${DORIS_THIRDPARTY}/installed/include -L${DORIS_THIRDPARTY}/installed/lib -llzo2 -std=c++17 diff --git a/tools/lzo/lzo_writer.cpp b/tools/lzo/lzo_writer.cpp new file mode 100644 index 00000000000..28dd77dd8de --- /dev/null +++ b/tools/lzo/lzo_writer.cpp @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <fstream> +#include <iostream> +#include <string> +#include <vector> +#include <cstdint> +#include <lzo/lzo1x.h> +#include <lzo/lzoconf.h> + +// LZO file format constants +const uint8_t LZOP_MAGIC[9] = {0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a}; +const uint16_t LZOP_VERSION = 0x1040; +const uint16_t MY_LZO_VERSION = 0x2080; // LZO library version +const uint16_t LZOP_VERSION_NEEDED = 0x0940; +const uint8_t COMPRESSION_METHOD = 1; // LZO1X +const uint8_t COMPRESSION_LEVEL = 5; +const uint32_t LZOP_FLAGS = 0x00003; // Use ADLER32 for all checksums +const uint32_t HEADER_SIZE = 34; // Minimum header size without filename +const uint32_t ADLER32_INIT_VALUE = 1; // Initial value for Adler32 + +// Compute Adler-32 checksum (same implementation as in Doris) +uint32_t olap_adler32(uint32_t adler, const char* buf, size_t len) { + uint32_t s1 = adler & 0xffff; + uint32_t s2 = (adler >> 16) & 0xffff; + + for (size_t i = 0; i < len; i++) { + s1 = (s1 + (unsigned char)buf[i]) % 65521; + s2 = (s2 + s1) % 65521; + } + + return (s2 << 16) + s1; +} + +class LzoWriter { +public: + LzoWriter(const std::string& filename) : _filename(filename) {} + + bool init() { + // Initialize LZO library + if (lzo_init() != LZO_E_OK) { + std::cerr << "Failed to initialize LZO library" << std::endl; + return false; + } + + _out_file.open(_filename, std::ios::binary); + if (!_out_file.is_open()) { + std::cerr << "Failed to open output file: " << _filename << std::endl; + return false; + } + + // Allocate work memory for compression + _wrkmem.resize(LZO1X_1_MEM_COMPRESS); + + // Write file header + write_header(); + return true; + } + + void write_header() { + // Prepare header data first + std::vector<uint8_t> header_data; + + // Write magic number (not included in checksum) + _out_file.write(reinterpret_cast<const char*>(LZOP_MAGIC), sizeof(LZOP_MAGIC)); + + // Add version info to header data + uint16_t version = __builtin_bswap16(LZOP_VERSION); + header_data.insert(header_data.end(), reinterpret_cast<uint8_t*>(&version), + reinterpret_cast<uint8_t*>(&version) + sizeof(version)); + + uint16_t lib_version = __builtin_bswap16(MY_LZO_VERSION); + header_data.insert(header_data.end(), reinterpret_cast<uint8_t*>(&lib_version), + reinterpret_cast<uint8_t*>(&lib_version) + sizeof(lib_version)); + + uint16_t version_needed = __builtin_bswap16(LZOP_VERSION_NEEDED); + header_data.insert(header_data.end(), reinterpret_cast<uint8_t*>(&version_needed), + reinterpret_cast<uint8_t*>(&version_needed) + sizeof(version_needed)); + + // Add method and level + header_data.push_back(COMPRESSION_METHOD); + header_data.push_back(COMPRESSION_LEVEL); + + // Add flags + uint32_t flags = __builtin_bswap32(LZOP_FLAGS); + header_data.insert(header_data.end(), reinterpret_cast<uint8_t*>(&flags), + reinterpret_cast<uint8_t*>(&flags) + sizeof(flags)); + + // Add mode + uint32_t mode = 0; + header_data.insert(header_data.end(), reinterpret_cast<uint8_t*>(&mode), + reinterpret_cast<uint8_t*>(&mode) + sizeof(mode)); + + // Add mtime + header_data.insert(header_data.end(), reinterpret_cast<uint8_t*>(&mode), + reinterpret_cast<uint8_t*>(&mode) + sizeof(mode)); + header_data.insert(header_data.end(), reinterpret_cast<uint8_t*>(&mode), + reinterpret_cast<uint8_t*>(&mode) + sizeof(mode)); + + // Add filename length + header_data.push_back(0); + + // Write all header data + _out_file.write(reinterpret_cast<const char*>(header_data.data()), header_data.size()); + + // Calculate and write header checksum + uint32_t header_checksum = compute_adler32(header_data.data(), header_data.size()); + write_uint32(header_checksum); + } + + void write_normal_block(const std::string& data) { + std::vector<uint8_t> compressed_data(data.size() + data.size() / 16 + 64 + 3); + lzo_uint compressed_len = 0; + + // Compress the data + int r = lzo1x_1_compress( + reinterpret_cast<const uint8_t*>(data.data()), + data.size(), + compressed_data.data(), + &compressed_len, + _wrkmem.data()); + + if (r != LZO_E_OK) { + std::cerr << "Compression failed" << std::endl; + return; + } + + std::cout << "Block info:" << std::endl; + std::cout << " Original data size: " << data.size() << std::endl; + std::cout << " Compressed size: " << compressed_len << std::endl; + std::cout << " Original data: '" << data << "'" << std::endl; + + // Write uncompressed size + write_uint32(data.size()); + + // If compressed size is not smaller than original size, + // we will store the original data without compression + bool is_compressed = compressed_len < data.size(); + + // Write compressed size (or original size if not compressed) + write_uint32(is_compressed ? compressed_len : data.size()); + + // Write uncompressed checksum + uint32_t uncompressed_checksum = compute_adler32( + reinterpret_cast<const uint8_t*>(data.data()), data.size()); + write_uint32(uncompressed_checksum); + + std::cout << " Uncompressed checksum calculation:" << std::endl; + std::cout << " Data length: " << data.size() << std::endl; + std::cout << " First few bytes:"; + for (size_t i = 0; i < std::min(data.size(), size_t(16)); ++i) { + printf(" %02x", (unsigned char)data[i]); + } + std::cout << std::endl; + std::cout << " Computed checksum: " << std::hex << uncompressed_checksum << std::dec << std::endl; + + if (is_compressed) { + // Detailed logging of compressed data + std::cout << " Complete compressed data:" << std::endl; + std::cout << " All bytes:"; + for (size_t i = 0; i < compressed_len; ++i) { + if (i % 16 == 0) std::cout << std::endl << " "; + printf(" %02x", compressed_data[i]); + } + std::cout << std::endl; + + // Write compressed checksum + uint32_t compressed_checksum = compute_adler32(compressed_data.data(), compressed_len); + write_uint32(compressed_checksum); + + std::cout << " Compressed checksum calculation:" << std::endl; + std::cout << " Data length: " << compressed_len << std::endl; + std::cout << " Bytes used for checksum:"; + for (size_t i = 0; i < compressed_len; ++i) { + if (i % 16 == 0) std::cout << std::endl << " "; + printf(" %02x", compressed_data[i]); + } + std::cout << std::endl; + std::cout << " Computed checksum: " << std::hex << compressed_checksum << std::dec << std::endl; + + // Write compressed data + _out_file.write(reinterpret_cast<const char*>(compressed_data.data()), compressed_len); + } else { + std::cout << " Data not compressed (compressed size >= original size)" << std::endl; + // Write original data directly + _out_file.write(data.data(), data.size()); + } + std::cout << "----------------------------------------" << std::endl; + } + + void write_zero_block() { + // Write a block with uncompressed size = 0 to mark end of file + write_uint32(0); + } + + void close() { + if (_out_file.is_open()) { + _out_file.close(); + } + } + +private: + void write_uint8(uint8_t value) { + _out_file.write(reinterpret_cast<const char*>(&value), sizeof(value)); + } + + void write_uint16(uint16_t value) { + value = __builtin_bswap16(value); // Convert to big-endian + _out_file.write(reinterpret_cast<const char*>(&value), sizeof(value)); + } + + void write_uint32(uint32_t value) { + value = __builtin_bswap32(value); // Convert to big-endian + _out_file.write(reinterpret_cast<const char*>(&value), sizeof(value)); + } + + // Compute Adler-32 checksum using the same implementation as Doris + uint32_t compute_adler32(const uint8_t* data, size_t len) { + uint32_t checksum = olap_adler32(ADLER32_INIT_VALUE, reinterpret_cast<const char*>(data), len); + std::cout << " Adler32 details:" << std::endl; + std::cout << " Input length: " << len << std::endl; + std::cout << " Initial value: " << ADLER32_INIT_VALUE << std::endl; + std::cout << " Final checksum: " << std::hex << checksum << std::dec << std::endl; + return checksum; + } + + std::string _filename; + std::ofstream _out_file; + std::vector<uint8_t> _wrkmem; +}; + +int main(int argc, char** argv) { + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " <output_file>" << std::endl; + return 1; + } + + LzoWriter writer(argv[1]); + if (!writer.init()) { + return 1; + } + + // Write a zero-sized block at the begin + writer.write_zero_block(); + + // Write first normal block with test data + std::string test_data1 = "This is the first block of test data for LZO compression!\n"; + writer.write_normal_block(test_data1); + + // Write a zero-sized block in the middle + writer.write_zero_block(); + + // Write third normal block with different test data + std::string test_data2 = "This is the third block with more test data for LZO compression!"; + writer.write_normal_block(test_data2); + + // Write a zero-sized block in the end + writer.write_zero_block(); + + // Write a zero-sized block in the end + writer.write_zero_block(); + + writer.close(); + std::cout << "Successfully created LZO file with three blocks (middle block size = 0)" << std::endl; + + return 0; +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org