This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b494700ab2 [improvement](status) Change the return type for 
block_compression (#47566)
0b494700ab2 is described below

commit 0b494700ab2f50d931a00c038ace996e93cad5f0
Author: lzyy2024 <2972013...@qq.com>
AuthorDate: Sat Feb 8 17:16:58 2025 +0800

    [improvement](status) Change the return type for block_compression (#47566)
    
    ### What problem does this PR solve?
    The previous block_compression.cpp didn't make sense for the type of
    Status returned, so I changed it
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    In the past, a function returned multiple states, perhaps directly
    returning one state. I made different treatments according to different
    states
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [x] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [x] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/util/block_compression.cpp          | 130 ++++++++++++++++++++---------
 be/src/vec/functions/function_compress.cpp |   2 +-
 2 files changed, 93 insertions(+), 39 deletions(-)

diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index 7a0aacd4252..fcb2f3fae08 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -187,7 +187,7 @@ public:
         auto decompressed_len =
                 LZ4_decompress_safe(input.data, output->data, input.size, 
output->size);
         if (decompressed_len < 0) {
-            return Status::InvalidArgument("fail to do LZ4 decompress, 
error={}", decompressed_len);
+            return Status::InternalError("fail to do LZ4 decompress, 
error={}", decompressed_len);
         }
         output->size = decompressed_len;
         return Status::OK();
@@ -458,8 +458,8 @@ private:
                                     &input_size, nullptr);
         if (LZ4F_isError(lres)) {
             decompress_failed = true;
-            return Status::InvalidArgument("Fail to do LZ4F decompress, 
res={}",
-                                           LZ4F_getErrorName(lres));
+            return Status::InternalError("Fail to do LZ4F decompress, res={}",
+                                         LZ4F_getErrorName(lres));
         } else if (input_size != input.size) {
             decompress_failed = true;
             return Status::InvalidArgument(
@@ -635,7 +635,10 @@ public:
         auto decompressed_len =
                 LZ4_decompress_safe(input.data, output->data, input.size, 
output->size);
         if (decompressed_len < 0) {
-            return Status::InvalidArgument("fail to do LZ4 decompress, 
error={}", decompressed_len);
+            return Status::InvalidArgument(
+                    "destination buffer is not large enough or the source 
stream is detected "
+                    "malformed, fail to do LZ4 decompress, error={}",
+                    decompressed_len);
         }
         output->size = decompressed_len;
         return Status::OK();
@@ -854,8 +857,12 @@ public:
         Slice s(*output);
 
         auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, 
input.size);
-        if (zres != Z_OK) {
-            return Status::InvalidArgument("Fail to do ZLib compress, 
error={}", zError(zres));
+        if (zres == Z_MEM_ERROR) {
+            throw Exception(Status::MemoryLimitExceeded(fmt::format(
+                    "ZLib compression failed due to memory 
allocationerror.error = {}, res = {} ",
+                    zError(zres), zres)));
+        } else if (zres != Z_OK) {
+            return Status::InternalError("Fail to do Zlib compress, error={}", 
zError(zres));
         }
         output->resize(s.size);
         return Status::OK();
@@ -871,9 +878,12 @@ public:
         zstrm.zfree = Z_NULL;
         zstrm.opaque = Z_NULL;
         auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
-        if (zres != Z_OK) {
-            return Status::InvalidArgument("Fail to do ZLib stream compress, 
error={}, res={}",
-                                           zError(zres), zres);
+        if (zres == Z_MEM_ERROR) {
+            throw Exception(Status::MemoryLimitExceeded(
+                    "Fail to do ZLib stream compress, error={}, res={}", 
zError(zres), zres));
+        } else if (zres != Z_OK) {
+            return Status::InternalError("Fail to do ZLib stream compress, 
error={}, res={}",
+                                         zError(zres), zres);
         }
         // we assume that output is e
         zstrm.next_out = (Bytef*)output->data();
@@ -888,16 +898,19 @@ public:
 
             zres = deflate(&zstrm, flush);
             if (zres != Z_OK && zres != Z_STREAM_END) {
-                return Status::InvalidArgument("Fail to do ZLib stream 
compress, error={}, res={}",
-                                               zError(zres), zres);
+                return Status::InternalError("Fail to do ZLib stream compress, 
error={}, res={}",
+                                             zError(zres), zres);
             }
         }
 
         output->resize(zstrm.total_out);
         zres = deflateEnd(&zstrm);
-        if (zres != Z_OK) {
-            return Status::InvalidArgument("Fail to do deflateEnd on ZLib 
stream, error={}, res={}",
-                                           zError(zres), zres);
+        if (zres == Z_DATA_ERROR) {
+            return Status::InvalidArgument("Fail to do deflateEnd, error={}, 
res={}", zError(zres),
+                                           zres);
+        } else if (zres != Z_OK) {
+            return Status::InternalError("Fail to do deflateEnd on ZLib 
stream, error={}, res={}",
+                                         zError(zres), zres);
         }
         return Status::OK();
     }
@@ -906,8 +919,13 @@ public:
         size_t input_size = input.size;
         auto zres =
                 ::uncompress2((Bytef*)output->data, &output->size, 
(Bytef*)input.data, &input_size);
-        if (zres != Z_OK) {
+        if (zres == Z_DATA_ERROR) {
             return Status::InvalidArgument("Fail to do ZLib decompress, 
error={}", zError(zres));
+        } else if (zres == Z_MEM_ERROR) {
+            throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib 
decompress, error={}",
+                                                        zError(zres)));
+        } else if (zres != Z_OK) {
+            return Status::InternalError("Fail to do ZLib decompress, 
error={}", zError(zres));
         }
         return Status::OK();
     }
@@ -932,8 +950,14 @@ public:
         uint32_t size = output->size();
         auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, 
(char*)input.data,
                                               input.size, 9, 0, 0);
-        if (bzres != BZ_OK) {
-            return Status::InternalError("Fail to do Bzip2 compress, ret={}", 
bzres);
+        if (bzres == BZ_MEM_ERROR) {
+            throw Exception(
+                    Status::MemoryLimitExceeded("Fail to do Bzip2 compress, 
ret={}", bzres));
+        } else if (bzres == BZ_PARAM_ERROR) {
+            return Status::InvalidArgument("Fail to do Bzip2 compress, 
ret={}", bzres);
+        } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != 
BZ_FINISH_OK &&
+                   bzres != BZ_STREAM_END && bzres != BZ_OK) {
+            return Status::InternalError("Failed to init bz2. status code: 
{}", bzres);
         }
         output->resize(size);
         return Status::OK();
@@ -947,7 +971,12 @@ public:
         bz_stream bzstrm;
         bzero(&bzstrm, sizeof(bzstrm));
         int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0);
-        if (bzres != BZ_OK) {
+        if (bzres == BZ_PARAM_ERROR) {
+            return Status::InvalidArgument("Failed to init bz2. status code: 
{}", bzres);
+        } else if (bzres == BZ_MEM_ERROR) {
+            throw Exception(
+                    Status::MemoryLimitExceeded("Failed to init bz2. status 
code: {}", bzres));
+        } else if (bzres != BZ_OK) {
             return Status::InternalError("Failed to init bz2. status code: 
{}", bzres);
         }
         // we assume that output is e
@@ -962,15 +991,20 @@ public:
             int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN;
 
             bzres = BZ2_bzCompress(&bzstrm, flush);
-            if (bzres != BZ_OK && bzres != BZ_STREAM_END) {
-                return Status::InternalError("Fail to do bzip2 stream 
compress, res={}", bzres);
+            if (bzres == BZ_PARAM_ERROR) {
+                return Status::InvalidArgument("Failed to init bz2. status 
code: {}", bzres);
+            } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != 
BZ_FINISH_OK &&
+                       bzres != BZ_STREAM_END && bzres != BZ_OK) {
+                return Status::InternalError("Failed to init bz2. status code: 
{}", bzres);
             }
         }
 
         size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | 
(size_t)bzstrm.total_out_lo32;
         output->resize(total_out);
         bzres = BZ2_bzCompressEnd(&bzstrm);
-        if (bzres != BZ_OK) {
+        if (bzres == BZ_PARAM_ERROR) {
+            return Status::InvalidArgument("Fail to do deflateEnd on bzip2 
stream, res={}", bzres);
+        } else if (bzres != BZ_OK) {
             return Status::InternalError("Fail to do deflateEnd on bzip2 
stream, res={}", bzres);
         }
         return Status::OK();
@@ -1102,14 +1136,14 @@ public:
 
                     if (ZSTD_isError(ret)) {
                         compress_failed = true;
-                        return Status::InvalidArgument("ZSTD_compressStream2 
error: {}",
-                                                       
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+                        return Status::InternalError("ZSTD_compressStream2 
error: {}",
+                                                     
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
                     }
 
                     // ret is ZSTD hint for needed output buffer size
                     if (ret > 0 && out_buf.pos == out_buf.size) {
                         compress_failed = true;
-                        return Status::InvalidArgument("ZSTD_compressStream2 
output buffer full");
+                        return Status::InternalError("ZSTD_compressStream2 
output buffer full");
                     }
 
                     finished = last_input ? (ret == 0) : (in_buf.pos == 
inputs[i].size);
@@ -1146,8 +1180,8 @@ public:
                                          input.size);
         if (ZSTD_isError(ret)) {
             decompress_failed = true;
-            return Status::InvalidArgument("ZSTD_decompressDCtx error: {}",
-                                           
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+            return Status::InternalError("ZSTD_decompressDCtx error: {}",
+                                         
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
         }
 
         // set decompressed size for caller
@@ -1239,8 +1273,12 @@ public:
         int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 
MAX_WBITS + GZIP_CODEC,
                                 8, Z_DEFAULT_STRATEGY);
 
-        if (zres != Z_OK) {
-            return Status::InvalidArgument("Fail to init zlib compress");
+        if (zres == Z_MEM_ERROR) {
+            throw Exception(Status::MemoryLimitExceeded(
+                    "Fail to init ZLib compress, error={}, res={}", 
zError(zres), zres));
+        } else if (zres != Z_OK) {
+            return Status::InternalError("Fail to init ZLib compress, 
error={}, res={}",
+                                         zError(zres), zres);
         }
 
         z_strm.next_in = (Bytef*)input.get_data();
@@ -1250,14 +1288,16 @@ public:
 
         zres = deflate(&z_strm, Z_FINISH);
         if (zres != Z_OK && zres != Z_STREAM_END) {
-            return Status::InvalidArgument("Fail to do ZLib stream compress, 
error={}, res={}",
-                                           zError(zres), zres);
+            return Status::InternalError("Fail to do ZLib stream compress, 
error={}, res={}",
+                                         zError(zres), zres);
         }
 
         output->resize(z_strm.total_out);
         zres = deflateEnd(&z_strm);
-        if (zres != Z_OK) {
+        if (zres == Z_DATA_ERROR) {
             return Status::InvalidArgument("Fail to end zlib compress");
+        } else if (zres != Z_OK) {
+            return Status::InternalError("Fail to end zlib compress");
         }
         return Status::OK();
     }
@@ -1273,10 +1313,14 @@ public:
         zstrm.opaque = Z_NULL;
         auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 
MAX_WBITS + GZIP_CODEC,
                                  8, Z_DEFAULT_STRATEGY);
-        if (zres != Z_OK) {
-            return Status::InvalidArgument("Fail to do ZLib stream compress, 
error={}, res={}",
-                                           zError(zres), zres);
+        if (zres == Z_MEM_ERROR) {
+            throw Exception(Status::MemoryLimitExceeded(
+                    "Fail to init ZLib stream compress, error={}, res={}", 
zError(zres), zres));
+        } else if (zres != Z_OK) {
+            return Status::InternalError("Fail to init ZLib stream compress, 
error={}, res={}",
+                                         zError(zres), zres);
         }
+
         // we assume that output is e
         zstrm.next_out = (Bytef*)output->data();
         zstrm.avail_out = output->size();
@@ -1290,16 +1334,19 @@ public:
 
             zres = deflate(&zstrm, flush);
             if (zres != Z_OK && zres != Z_STREAM_END) {
-                return Status::InvalidArgument("Fail to do ZLib stream 
compress, error={}, res={}",
-                                               zError(zres), zres);
+                return Status::InternalError("Fail to do ZLib stream compress, 
error={}, res={}",
+                                             zError(zres), zres);
             }
         }
 
         output->resize(zstrm.total_out);
         zres = deflateEnd(&zstrm);
-        if (zres != Z_OK) {
+        if (zres == Z_DATA_ERROR) {
             return Status::InvalidArgument("Fail to do deflateEnd on ZLib 
stream, error={}, res={}",
                                            zError(zres), zres);
+        } else if (zres != Z_OK) {
+            return Status::InternalError("Fail to do deflateEnd on ZLib 
stream, error={}, res={}",
+                                         zError(zres), zres);
         }
         return Status::OK();
     }
@@ -1312,7 +1359,7 @@ public:
 
         int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
         if (ret != Z_OK) {
-            return Status::InternalError("Fail to do ZLib stream compress, 
error={}, res={}",
+            return Status::InternalError("Fail to init ZLib decompress, 
error={}, res={}",
                                          zError(ret), ret);
         }
 
@@ -1327,6 +1374,13 @@ public:
             ret = inflate(&z_strm, Z_FINISH);
             if (ret != Z_OK && ret != Z_STREAM_END) {
                 (void)inflateEnd(&z_strm);
+                if (ret == Z_MEM_ERROR) {
+                    throw Exception(Status::MemoryLimitExceeded(
+                            "Fail to do ZLib stream compress, error={}, 
res={}", zError(ret), ret));
+                } else if (ret == Z_DATA_ERROR) {
+                    return Status::InvalidArgument(
+                            "Fail to do ZLib stream compress, error={}, 
res={}", zError(ret), ret);
+                }
                 return Status::InternalError("Fail to do ZLib stream compress, 
error={}, res={}",
                                              zError(ret), ret);
             }
diff --git a/be/src/vec/functions/function_compress.cpp 
b/be/src/vec/functions/function_compress.cpp
index 4c175a5fd44..b645e944bfe 100644
--- a/be/src/vec/functions/function_compress.cpp
+++ b/be/src/vec/functions/function_compress.cpp
@@ -102,7 +102,7 @@ public:
             }
 
             // Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, 
making sure st is always Z_OK
-            auto st = compression_codec->compress(data, &compressed_str);
+            RETURN_IF_ERROR(compression_codec->compress(data, 
&compressed_str));
             col_data.resize(col_data.size() + 4 + compressed_str.size());
 
             std::memcpy(col_data.data() + idx, &length, sizeof(length));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to