This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0b494700ab2 [improvement](status) Change the return type for block_compression (#47566) 0b494700ab2 is described below commit 0b494700ab2f50d931a00c038ace996e93cad5f0 Author: lzyy2024 <2972013...@qq.com> AuthorDate: Sat Feb 8 17:16:58 2025 +0800 [improvement](status) Change the return type for block_compression (#47566) ### What problem does this PR solve? The previous block_compression.cpp didn't make sense for the type of Status returned, so I changed it Issue Number: close #xxx Related PR: #xxx Problem Summary: In the past, a function returned multiple states, perhaps directly returning one state. I made different treatments according to different states ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [x] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [x] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/util/block_compression.cpp | 130 ++++++++++++++++++++--------- be/src/vec/functions/function_compress.cpp | 2 +- 2 files changed, 93 insertions(+), 39 deletions(-) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 7a0aacd4252..fcb2f3fae08 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -187,7 +187,7 @@ public: auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { - return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len); + return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len); } output->size = decompressed_len; return Status::OK(); @@ -458,8 +458,8 @@ private: &input_size, nullptr); if (LZ4F_isError(lres)) { decompress_failed = true; - return Status::InvalidArgument("Fail to do LZ4F decompress, res={}", - LZ4F_getErrorName(lres)); + return Status::InternalError("Fail to do LZ4F decompress, res={}", + LZ4F_getErrorName(lres)); } else if (input_size != input.size) { decompress_failed = true; return Status::InvalidArgument( @@ -635,7 +635,10 @@ public: auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { - return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len); + return Status::InvalidArgument( + "destination buffer is not large enough or the source stream is detected " + "malformed, fail to do LZ4 decompress, error={}", + decompressed_len); } output->size = decompressed_len; return Status::OK(); @@ -854,8 +857,12 @@ public: Slice s(*output); auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to do ZLib compress, error={}", zError(zres)); + if (zres == Z_MEM_ERROR) { + throw Exception(Status::MemoryLimitExceeded(fmt::format( + "ZLib compression failed due to memory allocationerror.error = {}, res = {} ", + zError(zres), zres))); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres)); } output->resize(s.size); return Status::OK(); @@ -871,9 +878,12 @@ public: zstrm.zfree = Z_NULL; zstrm.opaque = Z_NULL; auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + if (zres == Z_MEM_ERROR) { + throw Exception(Status::MemoryLimitExceeded( + "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres)); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(zres), zres); } // we assume that output is e zstrm.next_out = (Bytef*)output->data(); @@ -888,16 +898,19 @@ public: zres = deflate(&zstrm, flush); if (zres != Z_OK && zres != Z_STREAM_END) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(zres), zres); } } output->resize(zstrm.total_out); zres = deflateEnd(&zstrm); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", - zError(zres), zres); + if (zres == Z_DATA_ERROR) { + return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres), + zres); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", + zError(zres), zres); } return Status::OK(); } @@ -906,8 +919,13 @@ public: size_t input_size = input.size; auto zres = ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); - if (zres != Z_OK) { + if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres)); + } else if (zres == Z_MEM_ERROR) { + throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}", + zError(zres))); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres)); } return Status::OK(); } @@ -932,8 +950,14 @@ public: uint32_t size = output->size(); auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data, input.size, 9, 0, 0); - if (bzres != BZ_OK) { - return Status::InternalError("Fail to do Bzip2 compress, ret={}", bzres); + if (bzres == BZ_MEM_ERROR) { + throw Exception( + Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres)); + } else if (bzres == BZ_PARAM_ERROR) { + return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres); + } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && + bzres != BZ_STREAM_END && bzres != BZ_OK) { + return Status::InternalError("Failed to init bz2. status code: {}", bzres); } output->resize(size); return Status::OK(); @@ -947,7 +971,12 @@ public: bz_stream bzstrm; bzero(&bzstrm, sizeof(bzstrm)); int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0); - if (bzres != BZ_OK) { + if (bzres == BZ_PARAM_ERROR) { + return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); + } else if (bzres == BZ_MEM_ERROR) { + throw Exception( + Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres)); + } else if (bzres != BZ_OK) { return Status::InternalError("Failed to init bz2. status code: {}", bzres); } // we assume that output is e @@ -962,15 +991,20 @@ public: int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN; bzres = BZ2_bzCompress(&bzstrm, flush); - if (bzres != BZ_OK && bzres != BZ_STREAM_END) { - return Status::InternalError("Fail to do bzip2 stream compress, res={}", bzres); + if (bzres == BZ_PARAM_ERROR) { + return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); + } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && + bzres != BZ_STREAM_END && bzres != BZ_OK) { + return Status::InternalError("Failed to init bz2. status code: {}", bzres); } } size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32; output->resize(total_out); bzres = BZ2_bzCompressEnd(&bzstrm); - if (bzres != BZ_OK) { + if (bzres == BZ_PARAM_ERROR) { + return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres); + } else if (bzres != BZ_OK) { return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres); } return Status::OK(); @@ -1102,14 +1136,14 @@ public: if (ZSTD_isError(ret)) { compress_failed = true; - return Status::InvalidArgument("ZSTD_compressStream2 error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); + return Status::InternalError("ZSTD_compressStream2 error: {}", + ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // ret is ZSTD hint for needed output buffer size if (ret > 0 && out_buf.pos == out_buf.size) { compress_failed = true; - return Status::InvalidArgument("ZSTD_compressStream2 output buffer full"); + return Status::InternalError("ZSTD_compressStream2 output buffer full"); } finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); @@ -1146,8 +1180,8 @@ public: input.size); if (ZSTD_isError(ret)) { decompress_failed = true; - return Status::InvalidArgument("ZSTD_decompressDCtx error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); + return Status::InternalError("ZSTD_decompressDCtx error: {}", + ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // set decompressed size for caller @@ -1239,8 +1273,12 @@ public: int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, 8, Z_DEFAULT_STRATEGY); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to init zlib compress"); + if (zres == Z_MEM_ERROR) { + throw Exception(Status::MemoryLimitExceeded( + "Fail to init ZLib compress, error={}, res={}", zError(zres), zres)); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to init ZLib compress, error={}, res={}", + zError(zres), zres); } z_strm.next_in = (Bytef*)input.get_data(); @@ -1250,14 +1288,16 @@ public: zres = deflate(&z_strm, Z_FINISH); if (zres != Z_OK && zres != Z_STREAM_END) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(zres), zres); } output->resize(z_strm.total_out); zres = deflateEnd(&z_strm); - if (zres != Z_OK) { + if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to end zlib compress"); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to end zlib compress"); } return Status::OK(); } @@ -1273,10 +1313,14 @@ public: zstrm.opaque = Z_NULL; auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, 8, Z_DEFAULT_STRATEGY); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + if (zres == Z_MEM_ERROR) { + throw Exception(Status::MemoryLimitExceeded( + "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres)); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}", + zError(zres), zres); } + // we assume that output is e zstrm.next_out = (Bytef*)output->data(); zstrm.avail_out = output->size(); @@ -1290,16 +1334,19 @@ public: zres = deflate(&zstrm, flush); if (zres != Z_OK && zres != Z_STREAM_END) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(zres), zres); } } output->resize(zstrm.total_out); zres = deflateEnd(&zstrm); - if (zres != Z_OK) { + if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", zError(zres), zres); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", + zError(zres), zres); } return Status::OK(); } @@ -1312,7 +1359,7 @@ public: int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC); if (ret != Z_OK) { - return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + return Status::InternalError("Fail to init ZLib decompress, error={}, res={}", zError(ret), ret); } @@ -1327,6 +1374,13 @@ public: ret = inflate(&z_strm, Z_FINISH); if (ret != Z_OK && ret != Z_STREAM_END) { (void)inflateEnd(&z_strm); + if (ret == Z_MEM_ERROR) { + throw Exception(Status::MemoryLimitExceeded( + "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret)); + } else if (ret == Z_DATA_ERROR) { + return Status::InvalidArgument( + "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); + } return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); } diff --git a/be/src/vec/functions/function_compress.cpp b/be/src/vec/functions/function_compress.cpp index 4c175a5fd44..b645e944bfe 100644 --- a/be/src/vec/functions/function_compress.cpp +++ b/be/src/vec/functions/function_compress.cpp @@ -102,7 +102,7 @@ public: } // Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, making sure st is always Z_OK - auto st = compression_codec->compress(data, &compressed_str); + RETURN_IF_ERROR(compression_codec->compress(data, &compressed_str)); col_data.resize(col_data.size() + 4 + compressed_str.size()); std::memcpy(col_data.data() + idx, &length, sizeof(length)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org