This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b50bb1654 GH-49266: [C++][Parquet] Optimize delta bit-packed decoding 
when bit-width = 0 (#49296)
8b50bb1654 is described below

commit 8b50bb1654d8a0bdbc9b6ff7ceded1a836be5801
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Feb 24 16:04:55 2026 +0100

    GH-49266: [C++][Parquet] Optimize delta bit-packed decoding when bit-width 
= 0 (#49296)
    
    ### Rationale for this change
    
    DELTA_BINARY_PACKED decoding has limited performance due to a back-to-back 
dependency between the computations of value N and value N+1.
    
    However, we can do better if we know that all deltas are 0 in a miniblock. 
This happens when a miniblock's delta bit width.
    
    ### What changes are included in this PR?
    
    Avoid reading and accumulating deltas when we the delta bit width is 0. 
Instead, use a condensed formula that allows to compute a value without waiting 
for the previous one.
    
    Benchmark results on constant ranges of integers (on my local machine, AMD 
Zen 2 CPU):
    ```
                                                                     benchmark  
      baseline        contender  change %                                       
                                                                                
                                                                                
        counters
                                     BM_DeltaBitPackingDecode_Int32_Fixed/4096  
 3.821 GiB/sec   12.164 GiB/sec   218.323                              
{'family_index': 11, 'per_family_instance_index': 1, 'run_name': 
'BM_DeltaBitPackingDecode_Int32_Fixed/4096', 'repetitions': 1, 
'repetition_index': 0, 'threads': 1, 'iterations': 70160}
                                    BM_DeltaBitPackingDecode_Int32_Fixed/65536  
 3.897 GiB/sec   12.378 GiB/sec   217.678                              
{'family_index': 11, 'per_family_instance_index': 3, 'run_name': 
'BM_DeltaBitPackingDecode_Int32_Fixed/65536', 'repetitions': 1, 
'repetition_index': 0, 'threads': 1, 'iterations': 4487}
                                    BM_DeltaBitPackingDecode_Int32_Fixed/32768  
 3.909 GiB/sec   12.325 GiB/sec   215.309                              
{'family_index': 11, 'per_family_instance_index': 2, 'run_name': 
'BM_DeltaBitPackingDecode_Int32_Fixed/32768', 'repetitions': 1, 
'repetition_index': 0, 'threads': 1, 'iterations': 9004}
                                     BM_DeltaBitPackingDecode_Int32_Fixed/1024  
 3.542 GiB/sec   10.468 GiB/sec   195.538                             
{'family_index': 11, 'per_family_instance_index': 0, 'run_name': 
'BM_DeltaBitPackingDecode_Int32_Fixed/1024', 'repetitions': 1, 
'repetition_index': 0, 'threads': 1, 'iterations': 259535}
                                    BM_DeltaBitPackingDecode_Int64_Fixed/32768  
 9.761 GiB/sec   14.040 GiB/sec    43.847                             
{'family_index': 12, 'per_family_instance_index': 2, 'run_name': 
'BM_DeltaBitPackingDecode_Int64_Fixed/32768', 'repetitions': 1, 
'repetition_index': 0, 'threads': 1, 'iterations': 11192}
                                    BM_DeltaBitPackingDecode_Int64_Fixed/65536  
 9.814 GiB/sec   14.056 GiB/sec    43.222                              
{'family_index': 12, 'per_family_instance_index': 3, 'run_name': 
'BM_DeltaBitPackingDecode_Int64_Fixed/65536', 'repetitions': 1, 
'repetition_index': 0, 'threads': 1, 'iterations': 5617}
                                     BM_DeltaBitPackingDecode_Int64_Fixed/4096  
 9.672 GiB/sec   13.543 GiB/sec    40.014                              
{'family_index': 12, 'per_family_instance_index': 1, 'run_name': 
'BM_DeltaBitPackingDecode_Int64_Fixed/4096', 'repetitions': 1, 
'repetition_index': 0, 'threads': 1, 'iterations': 88891}
                                     BM_DeltaBitPackingDecode_Int64_Fixed/1024  
 8.850 GiB/sec   12.207 GiB/sec    37.923                             
{'family_index': 12, 'per_family_instance_index': 0, 'run_name': 
'BM_DeltaBitPackingDecode_Int64_Fixed/1024', 'repetitions': 1, 
'repetition_index': 0, 'threads': 1, 'iterations': 323720}
    ```
    
    ### Are these changes tested?
    
    Yes, by an additional test meant to stress this specific situation.
    
    ### Are there any user-facing changes?
    
    No.
    
    * GitHub Issue: #49266
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/decoder.cc       | 31 ++++++++++-----
 cpp/src/parquet/encoder.cc       |  2 +
 cpp/src/parquet/encoding_test.cc | 81 ++++++++++++++++++++++++----------------
 3 files changed, 72 insertions(+), 42 deletions(-)

diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc
index 5d32d39e5f..6c213f18f5 100644
--- a/cpp/src/parquet/decoder.cc
+++ b/cpp/src/parquet/decoder.cc
@@ -1619,16 +1619,27 @@ class DeltaBitPackDecoder : public 
TypedDecoderImpl<DType> {
 
       int values_decode = std::min(values_remaining_current_mini_block_,
                                    static_cast<uint32_t>(max_values - i));
-      if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) !=
-          values_decode) {
-        ParquetException::EofException();
-      }
-      for (int j = 0; j < values_decode; ++j) {
-        // Addition between min_delta, packed int and last_value should be 
treated as
-        // unsigned addition. Overflow is as expected.
-        buffer[i + j] = static_cast<UT>(min_delta_) + static_cast<UT>(buffer[i 
+ j]) +
-                        static_cast<UT>(last_value_);
-        last_value_ = buffer[i + j];
+      if (delta_bit_width_ == 0) {
+        // Fast path that avoids a back-to-back dependency between two 
consecutive
+        // computations: we know all deltas decode to zero. We actually don't
+        // even need to decode them.
+        for (int j = 0; j < values_decode; ++j) {
+          buffer[i + j] = static_cast<UT>(last_value_) +
+                          static_cast<UT>(j + 1) * static_cast<UT>(min_delta_);
+        }
+        last_value_ += static_cast<UT>(values_decode) * 
static_cast<UT>(min_delta_);
+      } else {
+        if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) !=
+            values_decode) {
+          ParquetException::EofException();
+        }
+        for (int j = 0; j < values_decode; ++j) {
+          // Addition between min_delta, packed int and last_value should be 
treated as
+          // unsigned addition. Overflow is as expected.
+          buffer[i + j] = static_cast<UT>(min_delta_) + 
static_cast<UT>(buffer[i + j]) +
+                          static_cast<UT>(last_value_);
+          last_value_ = buffer[i + j];
+        }
       }
       values_remaining_current_mini_block_ -= values_decode;
       i += values_decode;
diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc
index 97a5d77d41..edfebad5ab 100644
--- a/cpp/src/parquet/encoder.cc
+++ b/cpp/src/parquet/encoder.cc
@@ -1036,6 +1036,8 @@ template <typename DType>
 class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
   // Maximum possible header size
   static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  // If these constants are changed, then the corresponding values in
+  // TestDeltaBitPackEncoding (in `encoding_test.cc`) should be updated too.
   static constexpr uint32_t kValuesPerBlock =
       std::is_same_v<int32_t, typename DType::c_type> ? 128 : 256;
   static constexpr uint32_t kMiniBlocksPerBlock = 4;
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 66a3f7647f..3eb1940a4d 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -21,6 +21,7 @@
 #include <cstdint>
 #include <cstring>
 #include <limits>
+#include <span>
 #include <utility>
 #include <vector>
 
@@ -119,14 +120,14 @@ TEST(VectorBooleanTest, TestEncodeIntDecode) {
 }
 
 template <typename T>
-void VerifyResults(T* result, T* expected, int num_values) {
+void VerifyResults(const T* result, const T* expected, int num_values) {
   for (int i = 0; i < num_values; ++i) {
     ASSERT_EQ(expected[i], result[i]) << i;
   }
 }
 
 template <typename T>
-void VerifyResultsSpaced(T* result, T* expected, int num_values,
+void VerifyResultsSpaced(const T* result, const T* expected, int num_values,
                          const uint8_t* valid_bits, int64_t valid_bits_offset) 
{
   for (auto i = 0; i < num_values; ++i) {
     if (bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
@@ -136,14 +137,14 @@ void VerifyResultsSpaced(T* result, T* expected, int 
num_values,
 }
 
 template <>
-void VerifyResults<FLBA>(FLBA* result, FLBA* expected, int num_values) {
+void VerifyResults<FLBA>(const FLBA* result, const FLBA* expected, int 
num_values) {
   for (int i = 0; i < num_values; ++i) {
     ASSERT_EQ(0, memcmp(expected[i].ptr, result[i].ptr, 
kGenerateDataFLBALength)) << i;
   }
 }
 
 template <>
-void VerifyResultsSpaced<FLBA>(FLBA* result, FLBA* expected, int num_values,
+void VerifyResultsSpaced<FLBA>(const FLBA* result, const FLBA* expected, int 
num_values,
                                const uint8_t* valid_bits, int64_t 
valid_bits_offset) {
   for (auto i = 0; i < num_values; ++i) {
     if (bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
@@ -1735,35 +1736,45 @@ class TestDeltaBitPackEncoding : public 
TestEncodingBase<Type> {
     CheckRoundtripSpaced(valid_bits, valid_bits_offset);
   }
 
-  void CheckDecoding() {
+  void CheckDecoding() { CheckDecoding(std::span(draws_, num_values_)); }
+
+  void CheckDecoding(std::span<const c_type> expected_values) {
+    const auto num_values = static_cast<int>(expected_values.size());
     auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
     auto read_batch_sizes = kReadBatchSizes;
-    read_batch_sizes.push_back(num_values_);
+    read_batch_sizes.push_back(num_values);
     // Exercise different batch sizes
     for (const int read_batch_size : read_batch_sizes) {
-      decoder->SetData(num_values_, encode_buffer_->data(),
+      decoder->SetData(num_values, encode_buffer_->data(),
                        static_cast<int>(encode_buffer_->size()));
 
+      std::vector<c_type> decoded_values(num_values);
       int values_decoded = 0;
-      while (values_decoded < num_values_) {
-        values_decoded += decoder->Decode(decode_buf_ + values_decoded, 
read_batch_size);
+      while (values_decoded < num_values) {
+        values_decoded +=
+            decoder->Decode(decoded_values.data() + values_decoded, 
read_batch_size);
       }
-      ASSERT_EQ(num_values_, values_decoded);
-      ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+      ASSERT_EQ(num_values, values_decoded);
+      ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decoded_values.data(),
+                                                    expected_values.data(), 
num_values));
     }
   }
 
-  void CheckRoundtrip() override {
+  void CheckRoundtripWithValues(std::span<const c_type> values) {
     auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED,
                                           /*use_dictionary=*/false, 
descr_.get());
     // Encode a number of times to exercise the flush logic
     for (size_t i = 0; i < kNumRoundTrips; ++i) {
-      encoder->Put(draws_, num_values_);
+      encoder->Put(values.data(), static_cast<int>(values.size()));
       encode_buffer_ = encoder->FlushValues();
-      CheckDecoding();
+      CheckDecoding(values);
     }
   }
 
+  void CheckRoundtrip() override {
+    CheckRoundtripWithValues(std::span(draws_, num_values_));
+  }
+
   void CheckRoundtripSpaced(const uint8_t* valid_bits,
                             int64_t valid_bits_offset) override {
     auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED,
@@ -1920,24 +1931,7 @@ TYPED_TEST(TestDeltaBitPackEncoding, 
DeltaBitPackedWrapping) {
                                1,
                                -1,
                                1};
-  const int num_values = static_cast<int>(int_values.size());
-
-  const auto encoder = MakeTypedEncoder<TypeParam>(
-      Encoding::DELTA_BINARY_PACKED, /*use_dictionary=*/false, 
this->descr_.get());
-  encoder->Put(int_values, num_values);
-  const auto encoded = encoder->FlushValues();
-
-  const auto decoder =
-      MakeTypedDecoder<TypeParam>(Encoding::DELTA_BINARY_PACKED, 
this->descr_.get());
-
-  std::vector<T> decoded(num_values);
-  decoder->SetData(num_values, encoded->data(), 
static_cast<int>(encoded->size()));
-
-  const int values_decoded = decoder->Decode(decoded.data(), num_values);
-
-  ASSERT_EQ(num_values, values_decoded);
-  ASSERT_NO_FATAL_FAILURE(
-      VerifyResults<T>(decoded.data(), int_values.data(), num_values));
+  this->CheckRoundtripWithValues(int_values);
 }
 
 // Test that the DELTA_BINARY_PACKED encoding does not use more bits to encode 
than
@@ -1967,6 +1961,29 @@ TYPED_TEST(TestDeltaBitPackEncoding, DeltaBitPackedSize) 
{
   ASSERT_EQ(encoded->size(), encoded_size);
 }
 
+TYPED_TEST(TestDeltaBitPackEncoding, ZeroDeltaBitWidth) {
+  // Exercise ranges of zero deltas interspersed between ranges of non-zero 
deltas.
+  // This checks that the zero bit-width optimization in GH-49266 doesn't mess
+  // decoder state.
+  using T = typename TypeParam::c_type;
+
+  // At least the size of a block
+  constexpr int kRangeSize = 256;
+
+  std::vector<T> int_values;
+  for (int i = 0; i < kRangeSize; ++i) {
+    int_values.push_back((i * 7) % 11);
+  }
+  // Range of equal values, should emit zero-width deltas
+  for (int i = 0; i < kRangeSize * 2; ++i) {
+    int_values.push_back(42);
+  }
+  for (int i = 0; i < kRangeSize; ++i) {
+    int_values.push_back((i * 5) % 7);
+  }
+  this->CheckRoundtripWithValues(int_values);
+}
+
 // ----------------------------------------------------------------------
 // Rle for Boolean encode/decode tests.
 

Reply via email to