This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d4f1cfad79 Implement Improved arrow-avro Reader Zero-Byte Record
Handling (#7966)
d4f1cfad79 is described below
commit d4f1cfad79ee38e65d8c92982616e5facd463c52
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Jul 22 16:42:23 2025 -0500
Implement Improved arrow-avro Reader Zero-Byte Record Handling (#7966)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/4886
- Follow up to https://github.com/apache/arrow-rs/pull/7834
# Rationale for this change
The initial Avro reader implementation contained an under-developed and
temporary safeguard to prevent infinite loops when processing records
that consumed zero bytes from the input buffer.
When the `Decoder` reported that zero bytes were consumed, the `Reader`
would advance it's cursor to the end of the current data block. While
this successfully prevented an infinite loop, it had the critical side
effect of silently discarding any remaining data in that block, leading
to potential data loss.
This change enhances the decoding logic to handle these zero-byte values
correctly, ensuring that the `Reader` makes proper progress without
dropping data and without risking an infinite loop.
# What changes are included in this PR?
- **Refined Decoder Logic**: The `Decoder` has been updated to
accurately track and report the number of bytes consumed for all values,
including valid zero-length records like `null` or empty `bytes`. This
ensures the decoder always makes forward progress.
- **Removal of Data-Skipping Safeguard**: The logic in the `Reader` that
previously advanced to the end of a block on a zero-byte read has been
removed. The reader now relies on the decoder to report accurate
consumption and advances its cursor incrementally and safely.
- * New integration test using a temporary `zero_byte.avro` file created
via this python script:
https://gist.github.com/jecsand838/e57647d0d12853f3cf07c350a6a40395
# Are these changes tested?
Yes, a new `test_read_zero_byte_avro_file` test was added that reads the
new `zero_byte.avro` file and confirms the update.
# Are there any user-facing changes?
N/A
# Follow-Up PRs
1. PR to update `test_read_zero_byte_avro_file` once
https://github.com/apache/arrow-testing/pull/109 is merged in.
---
arrow-avro/src/reader/mod.rs | 36 ++++++++++++++++++++++++++++--------
arrow-avro/test/data/zero_byte.avro | Bin 0 -> 210 bytes
2 files changed, 28 insertions(+), 8 deletions(-)
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index b98777d3d7..02d3f49aa1 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -157,9 +157,10 @@ impl Decoder {
let mut total_consumed = 0usize;
while total_consumed < data.len() && self.decoded_rows <
self.batch_size {
let consumed = self.record_decoder.decode(&data[total_consumed..],
1)?;
- if consumed == 0 {
- break;
- }
+ // A successful call to record_decoder.decode means one row was
decoded.
+ // If `consumed` is 0 on a non-empty buffer, it implies a valid
zero-byte record.
+ // We increment `decoded_rows` to mark progress and avoid an
infinite loop.
+ // We add `consumed` (which can be 0) to `total_consumed`.
total_consumed += consumed;
self.decoded_rows += 1;
}
@@ -364,11 +365,7 @@ impl<R: BufRead> Reader<R> {
}
// Try to decode more rows from the current block.
let consumed =
self.decoder.decode(&self.block_data[self.block_cursor..])?;
- if consumed == 0 && self.block_cursor < self.block_data.len() {
- self.block_cursor = self.block_data.len();
- } else {
- self.block_cursor += consumed;
- }
+ self.block_cursor += consumed;
}
self.decoder.flush()
}
@@ -499,6 +496,29 @@ mod test {
assert!(batch.column(0).as_any().is::<StringViewArray>());
}
+ #[test]
+ fn test_read_zero_byte_avro_file() {
+ let batch = read_file("test/data/zero_byte.avro", 3, false);
+ let schema = batch.schema();
+ assert_eq!(schema.fields().len(), 1);
+ let field = schema.field(0);
+ assert_eq!(field.name(), "data");
+ assert_eq!(field.data_type(), &DataType::Binary);
+ assert!(field.is_nullable());
+ assert_eq!(batch.num_rows(), 3);
+ assert_eq!(batch.num_columns(), 1);
+ let binary_array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .unwrap();
+ assert!(binary_array.is_null(0));
+ assert!(binary_array.is_valid(1));
+ assert_eq!(binary_array.value(1), b"");
+ assert!(binary_array.is_valid(2));
+ assert_eq!(binary_array.value(2), b"some bytes");
+ }
+
#[test]
fn test_alltypes() {
let files = [
diff --git a/arrow-avro/test/data/zero_byte.avro
b/arrow-avro/test/data/zero_byte.avro
new file mode 100644
index 0000000000..f7ffd29b68
Binary files /dev/null and b/arrow-avro/test/data/zero_byte.avro differ