This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 97c0e7ca47 Refactor arrow-avro `Decoder` to support partial decoding 
(#8100)
97c0e7ca47 is described below

commit 97c0e7ca47bb14ed16c4b66d0653d5da9b4de090
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Aug 12 12:24:06 2025 -0500

    Refactor arrow-avro `Decoder` to support partial decoding (#8100)
    
    # Which issue does this PR close?
    
    - Part of https://github.com/apache/arrow-rs/issues/4886
    
    # Rationale for this change
    
    Decoding Avro **single-object encoded** streams was brittle when data
    arrived in partial chunks (e.g., from async or networked sources). The
    old implementation relied on ad‑hoc prefix handling and assumed a full
    record would be available, producing hard errors for otherwise normal
    “incomplete buffer” situations. Additionally, the Avro OCF (Object
    Container File) path iterated record‑by‑record through a shared row
    decoder, adding overhead.
    
    This PR introduces a small state machine for single‑object decoding and
    a block‑aware path for OCF, making streaming more robust and OCF
    decoding more efficient while preserving the public API surface.
    
    # What changes are included in this PR?
    
    **Single‑object decoding (streaming)**
    - Replace ad‑hoc prefix parsing (`expect_prefix`, `handle_prefix`,
    `handle_fingerprint`) with an explicit state machine:
    - New `enum DecoderState { Magic, Fingerprint, Record, SchemaChange,
    Finished }`.
    - `Decoder` now tracks `state`, `bytes_remaining`, and a
    `fingerprint_buf` to incrementally assemble the fingerprint.
    - New helper `is_incomplete_data(&ArrowError) -> bool` to treat
    “Unexpected EOF”, “bad varint”, and “offset overflow” as *incomplete
    input* instead of fatal errors.
    - Reworked `Decoder::decode(&[u8]) -> Result<usize, ArrowError>`:
      - Consumes data according to the state machine.
    - Cleanly returns when more bytes are needed (no spurious errors for
    partial chunks).
      - Defers schema switching until after flushing currently decoded rows.
    - Updated `Decoder::flush()` to emit a batch only when rows are ready
    and to transition the state correctly (including a staged
    `SchemaChange`).
    
    **OCF (Object Container File) decoding**
    - Add block‑aware decoding methods on `Decoder` used by `Reader`:
    - `decode_block(&[u8], count: usize) -> Result<(consumed,
    records_decoded), ArrowError>`
      - `flush_block() -> Result<Option<RecordBatch>, ArrowError>`
    - `Reader` now tracks `block_count` and decodes up to the number of
    records in the current block, reducing per‑row overhead and improving
    throughput.
    - `ReaderBuilder::build` initializes the new `block_count` path.
    
    **API / struct adjustments**
    - Remove internal `expect_prefix` flag from `Decoder`; behavior is
    driven by the state machine.
    - `ReaderBuilder::make_decoder_with_parts` updated accordingly (no
    behavior change to public builder methods).
    - No public API signature changes for `Reader`, `Decoder`, or
    `ReaderBuilder`.
    
    **Tests**
    - Add targeted streaming tests:
      - `test_two_messages_same_schema`
      - `test_two_messages_schema_switch`
      - `test_split_message_across_chunks`
    - Update prefix‑handling tests to validate state transitions (`Magic` →
    `Fingerprint`, etc.) and new error messages.
    - Retain and exercise existing suites (types, lists, nested structures,
    decimals, enums, strict mode) with minimal adjustments.
    
    # Are these changes tested?
    
    Yes.
    - New unit tests cover:
      - Multi‑message streams with/without schema switches
      - Messages split across chunk boundaries
      - Incremental prefix/fingerprint parsing
    - Existing tests continue to cover OCF reading, compression,
    complex/nested types, strict mode, etc.
    - The new OCF path is exercised by the unchanged OCF tests since
    `Reader` now uses `decode_block/flush_block`.
    
    # Are there any user-facing changes?
    
    N/A
    
    ---------
    
    Co-authored-by: Ryan Johnson <[email protected]>
---
 arrow-avro/src/reader/mod.rs | 496 +++++++++++++++++++++++++++++--------------
 1 file changed, 339 insertions(+), 157 deletions(-)

diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 1f741d6d53..7bbcaeb9f0 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -89,7 +89,6 @@
 //! }
 //! ```
 //!
-
 use crate::codec::{AvroField, AvroFieldBuilder};
 use crate::schema::{
     compare_schemas, generate_fingerprint, AvroSchema, Fingerprint, 
FingerprintAlgorithm, Schema,
@@ -130,6 +129,15 @@ fn read_header<R: BufRead>(mut reader: R) -> 
Result<Header, ArrowError> {
     })
 }
 
+// NOTE: The Current ` is_incomplete_data ` below is temporary and will be 
improved prior to public release
+fn is_incomplete_data(err: &ArrowError) -> bool {
+    matches!(
+        err,
+        ArrowError::ParseError(msg)
+            if msg.contains("Unexpected EOF")
+    )
+}
+
 /// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
 #[derive(Debug)]
 pub struct Decoder {
@@ -139,10 +147,10 @@ pub struct Decoder {
     remaining_capacity: usize,
     cache: IndexMap<Fingerprint, RecordDecoder>,
     fingerprint_algorithm: FingerprintAlgorithm,
-    expect_prefix: bool,
     utf8_view: bool,
     strict_mode: bool,
     pending_schema: Option<(Fingerprint, RecordDecoder)>,
+    awaiting_body: bool,
 }
 
 impl Decoder {
@@ -162,29 +170,33 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
-        if self.expect_prefix
-            && data.len() >= SINGLE_OBJECT_MAGIC.len()
-            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
-        {
-            return Err(ArrowError::ParseError(
-                "Expected single‑object encoding fingerprint prefix for first 
message \
-                 (writer_schema_store is set but active_fingerprint is None)"
-                    .into(),
-            ));
-        }
         let mut total_consumed = 0usize;
-        // The loop stops when the batch is full, a schema change is staged,
-        // or handle_prefix indicates we need more bytes (Some(0)).
         while total_consumed < data.len() && self.remaining_capacity > 0 {
-            if let Some(n) = self.handle_prefix(&data[total_consumed..])? {
-                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
-                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
-                total_consumed += n;
+            if self.awaiting_body {
+                match self.active_decoder.decode(&data[total_consumed..], 1) {
+                    Ok(n) => {
+                        self.remaining_capacity -= 1;
+                        total_consumed += n;
+                        self.awaiting_body = false;
+                        continue;
+                    }
+                    Err(ref e) if is_incomplete_data(e) => break,
+                    err => return err,
+                };
+            }
+            match self.handle_prefix(&data[total_consumed..])? {
+                Some(0) => break, // insufficient bytes
+                Some(n) => {
+                    total_consumed += n;
+                    self.apply_pending_schema_if_batch_empty();
+                    self.awaiting_body = true;
+                }
+                None => {
+                    return Err(ArrowError::ParseError(
+                        "Missing magic bytes and fingerprint".to_string(),
+                    ))
+                }
             }
-            // No prefix: decode one row and keep going.
-            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
-            self.remaining_capacity -= 1;
-            total_consumed += n;
         }
         Ok(total_consumed)
     }
@@ -195,10 +207,6 @@ impl Decoder {
     // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
     // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
     fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>, 
ArrowError> {
-        // If there is no schema store, prefixes are unrecognized.
-        if !self.expect_prefix {
-            return Ok(None);
-        }
         // Need at least the magic bytes to decide (2 bytes).
         let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
             return Ok(Some(0)); // Get more bytes
@@ -252,15 +260,7 @@ impl Decoder {
         Ok(Some(N))
     }
 
-    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
-    /// `Ok(None)` if no new rows are available.
-    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.remaining_capacity == self.batch_size {
-            return Ok(None);
-        }
-        let batch = self.active_decoder.flush()?;
-        self.remaining_capacity = self.batch_size;
-        // Apply any staged schema switch.
+    fn apply_pending_schema(&mut self) {
         if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
             if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
                 let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
@@ -270,9 +270,32 @@ impl Decoder {
                 self.active_decoder = new_decoder;
             }
         }
+    }
+
+    fn apply_pending_schema_if_batch_empty(&mut self) {
+        if self.batch_is_empty() {
+            self.apply_pending_schema();
+        }
+    }
+
+    fn flush_and_reset(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.batch_is_empty() {
+            return Ok(None);
+        }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
         Ok(Some(batch))
     }
 
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        // We must flush the active decoder before switching to the pending 
one.
+        let batch = self.flush_and_reset();
+        self.apply_pending_schema();
+        batch
+    }
+
     /// Returns the number of rows that can be added to this decoder before it 
is full.
     pub fn capacity(&self) -> usize {
         self.remaining_capacity
@@ -282,6 +305,31 @@ impl Decoder {
     pub fn batch_is_full(&self) -> bool {
         self.remaining_capacity == 0
     }
+
+    /// Returns true if the decoder has not decoded any batches yet.
+    pub fn batch_is_empty(&self) -> bool {
+        self.remaining_capacity == self.batch_size
+    }
+
+    // Decode either the block count or remaining capacity from `data` (an OCF 
block payload).
+    //
+    // Returns the number of bytes consumed from `data` along with the number 
of records decoded.
+    fn decode_block(&mut self, data: &[u8], count: usize) -> Result<(usize, 
usize), ArrowError> {
+        // OCF decoding never interleaves records across blocks, so no 
chunking.
+        let to_decode = std::cmp::min(count, self.remaining_capacity);
+        if to_decode == 0 {
+            return Ok((0, 0));
+        }
+        let consumed = self.active_decoder.decode(data, to_decode)?;
+        self.remaining_capacity -= to_decode;
+        Ok((consumed, to_decode))
+    }
+
+    // Produce a `RecordBatch` if at least one row is fully decoded, returning
+    // `Ok(None)` if no new rows are available.
+    fn flush_block(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        self.flush_and_reset()
+    }
 }
 
 /// A builder to create an [`Avro Reader`](Reader) that reads Avro data
@@ -342,7 +390,6 @@ impl ReaderBuilder {
         active_decoder: RecordDecoder,
         active_fingerprint: Option<Fingerprint>,
         cache: IndexMap<Fingerprint, RecordDecoder>,
-        expect_prefix: bool,
         fingerprint_algorithm: FingerprintAlgorithm,
     ) -> Decoder {
         Decoder {
@@ -351,11 +398,11 @@ impl ReaderBuilder {
             active_fingerprint,
             active_decoder,
             cache,
-            expect_prefix,
             utf8_view: self.utf8_view,
             fingerprint_algorithm,
             strict_mode: self.strict_mode,
             pending_schema: None,
+            awaiting_body: false,
         }
     }
 
@@ -376,7 +423,6 @@ impl ReaderBuilder {
                 record_decoder,
                 None,
                 IndexMap::new(),
-                false,
                 FingerprintAlgorithm::Rabin,
             ));
         }
@@ -423,7 +469,6 @@ impl ReaderBuilder {
             active_decoder,
             Some(start_fingerprint),
             cache,
-            true,
             store.fingerprint_algorithm(),
         ))
     }
@@ -496,6 +541,7 @@ impl ReaderBuilder {
             decoder,
             block_decoder: BlockDecoder::default(),
             block_data: Vec::new(),
+            block_count: 0,
             block_cursor: 0,
             finished: false,
         })
@@ -521,6 +567,7 @@ pub struct Reader<R: BufRead> {
     decoder: Decoder,
     block_decoder: BlockDecoder,
     block_data: Vec<u8>,
+    block_count: usize,
     block_cursor: usize,
     finished: bool,
 }
@@ -550,12 +597,12 @@ impl<R: BufRead> Reader<R> {
                 self.reader.consume(consumed);
                 if let Some(block) = self.block_decoder.flush() {
                     // Successfully decoded a block.
-                    let block_data = if let Some(ref codec) = 
self.header.compression()? {
+                    self.block_data = if let Some(ref codec) = 
self.header.compression()? {
                         codec.decompress(&block.data)?
                     } else {
                         block.data
                     };
-                    self.block_data = block_data;
+                    self.block_count = block.count;
                     self.block_cursor = 0;
                 } else if consumed == 0 {
                     // The block decoder made no progress on a non-empty 
buffer.
@@ -564,11 +611,16 @@ impl<R: BufRead> Reader<R> {
                     ));
                 }
             }
-            // Try to decode more rows from the current block.
-            let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
-            self.block_cursor += consumed;
+            // Decode as many rows as will fit in the current batch
+            if self.block_cursor < self.block_data.len() {
+                let (consumed, records_decoded) = self
+                    .decoder
+                    .decode_block(&self.block_data[self.block_cursor..], 
self.block_count)?;
+                self.block_cursor += consumed;
+                self.block_count -= records_decoded;
+            }
         }
-        self.decoder.flush()
+        self.decoder.flush_block()
     }
 }
 
@@ -709,6 +761,35 @@ mod test {
             .expect("decoder")
     }
 
+    fn make_value_schema(pt: PrimitiveType) -> AvroSchema {
+        let json_schema = format!(
+            
r#"{{"type":"record","name":"S","fields":[{{"name":"v","type":"{}"}}]}}"#,
+            pt.as_ref()
+        );
+        AvroSchema::new(json_schema)
+    }
+
+    fn encode_zigzag(value: i64) -> Vec<u8> {
+        let mut n = ((value << 1) ^ (value >> 63)) as u64;
+        let mut out = Vec::new();
+        loop {
+            if (n & !0x7F) == 0 {
+                out.push(n as u8);
+                break;
+            } else {
+                out.push(((n & 0x7F) | 0x80) as u8);
+                n >>= 7;
+            }
+        }
+        out
+    }
+
+    fn make_message(fp: Fingerprint, value: i64) -> Vec<u8> {
+        let mut msg = make_prefix(fp);
+        msg.extend_from_slice(&encode_zigzag(value));
+        msg
+    }
+
     #[test]
     fn test_schema_store_register_lookup() {
         let schema_int = make_record_schema(PrimitiveType::Int);
@@ -735,35 +816,6 @@ mod test {
         );
     }
 
-    #[test]
-    fn test_missing_initial_fingerprint_error() {
-        let (store, _fp_int, _fp_long, schema_int, _schema_long) = 
make_two_schema_store();
-        let mut decoder = ReaderBuilder::new()
-            .with_batch_size(8)
-            .with_reader_schema(schema_int.clone())
-            .with_writer_schema_store(store)
-            .build_decoder()
-            .unwrap();
-        let buf = [0x02u8, 0x00u8];
-        let err = decoder.decode(&buf).expect_err("decode should error");
-        let msg = err.to_string();
-        assert!(
-            msg.contains("Expected single‑object encoding fingerprint"),
-            "unexpected message: {msg}"
-        );
-    }
-
-    #[test]
-    fn test_handle_prefix_no_schema_store() {
-        let (store, fp_int, _fp_long, schema_int, _schema_long) = 
make_two_schema_store();
-        let mut decoder = make_decoder(&store, fp_int, &schema_int);
-        decoder.expect_prefix = false;
-        let res = decoder
-            .handle_prefix(&SINGLE_OBJECT_MAGIC[..])
-            .expect("handle_prefix");
-        assert!(res.is_none(), "Expected None when expect_prefix is false");
-    }
-
     #[test]
     fn test_handle_prefix_incomplete_magic() {
         let (store, fp_int, _fp_long, schema_int, _schema_long) = 
make_two_schema_store();
@@ -815,6 +867,219 @@ mod test {
         assert_eq!(decoder.pending_schema.as_ref().unwrap().0, fp_long);
     }
 
+    #[test]
+    fn test_two_messages_same_schema() {
+        let writer_schema = make_value_schema(PrimitiveType::Int);
+        let reader_schema = writer_schema.clone();
+        let mut store = SchemaStore::new();
+        let fp = store.register(writer_schema).unwrap();
+        let msg1 = make_message(fp, 42);
+        let msg2 = make_message(fp, 11);
+        let input = [msg1.clone(), msg2.clone()].concat();
+        let mut decoder = ReaderBuilder::new()
+            .with_batch_size(8)
+            .with_reader_schema(reader_schema.clone())
+            .with_writer_schema_store(store)
+            .with_active_fingerprint(fp)
+            .build_decoder()
+            .unwrap();
+        let _ = decoder.decode(&input).unwrap();
+        let batch = decoder.flush().unwrap().expect("batch");
+        assert_eq!(batch.num_rows(), 2);
+        let col = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(col.value(0), 42);
+        assert_eq!(col.value(1), 11);
+    }
+
+    #[test]
+    fn test_two_messages_schema_switch() {
+        let w_int = make_value_schema(PrimitiveType::Int);
+        let w_long = make_value_schema(PrimitiveType::Long);
+        let r_long = w_long.clone();
+        let mut store = SchemaStore::new();
+        let fp_int = store.register(w_int).unwrap();
+        let fp_long = store.register(w_long).unwrap();
+        let msg_int = make_message(fp_int, 1);
+        let msg_long = make_message(fp_long, 123456789_i64);
+        let mut decoder = ReaderBuilder::new()
+            .with_batch_size(8)
+            .with_writer_schema_store(store)
+            .with_active_fingerprint(fp_int)
+            .build_decoder()
+            .unwrap();
+        let _ = decoder.decode(&msg_int).unwrap();
+        let batch1 = decoder.flush().unwrap().expect("batch1");
+        assert_eq!(batch1.num_rows(), 1);
+        assert_eq!(
+            batch1
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .value(0),
+            1
+        );
+        let _ = decoder.decode(&msg_long).unwrap();
+        let batch2 = decoder.flush().unwrap().expect("batch2");
+        assert_eq!(batch2.num_rows(), 1);
+        assert_eq!(
+            batch2
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int64Array>()
+                .unwrap()
+                .value(0),
+            123456789_i64
+        );
+    }
+
+    #[test]
+    fn test_split_message_across_chunks() {
+        let writer_schema = make_value_schema(PrimitiveType::Int);
+        let reader_schema = writer_schema.clone();
+        let mut store = SchemaStore::new();
+        let fp = store.register(writer_schema).unwrap();
+        let msg1 = make_message(fp, 7);
+        let msg2 = make_message(fp, 8);
+        let msg3 = make_message(fp, 9);
+        let (pref2, body2) = msg2.split_at(10);
+        let (pref3, body3) = msg3.split_at(10);
+        let mut decoder = ReaderBuilder::new()
+            .with_batch_size(8)
+            .with_reader_schema(reader_schema)
+            .with_writer_schema_store(store)
+            .with_active_fingerprint(fp)
+            .build_decoder()
+            .unwrap();
+        let _ = decoder.decode(&msg1).unwrap();
+        let batch1 = decoder.flush().unwrap().expect("batch1");
+        assert_eq!(batch1.num_rows(), 1);
+        assert_eq!(
+            batch1
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .value(0),
+            7
+        );
+        let _ = decoder.decode(pref2).unwrap();
+        assert!(decoder.flush().unwrap().is_none());
+        let mut chunk3 = Vec::from(body2);
+        chunk3.extend_from_slice(pref3);
+        let _ = decoder.decode(&chunk3).unwrap();
+        let batch2 = decoder.flush().unwrap().expect("batch2");
+        assert_eq!(batch2.num_rows(), 1);
+        assert_eq!(
+            batch2
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .value(0),
+            8
+        );
+        let _ = decoder.decode(body3).unwrap();
+        let batch3 = decoder.flush().unwrap().expect("batch3");
+        assert_eq!(batch3.num_rows(), 1);
+        assert_eq!(
+            batch3
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .value(0),
+            9
+        );
+    }
+
+    #[test]
+    fn test_decode_stream_with_schema() {
+        struct TestCase<'a> {
+            name: &'a str,
+            schema: &'a str,
+            expected_error: Option<&'a str>,
+        }
+        let tests = vec![
+            TestCase {
+                name: "success",
+                schema: 
r#"{"type":"record","name":"test","fields":[{"name":"f2","type":"string"}]}"#,
+                expected_error: None,
+            },
+            TestCase {
+                name: "valid schema invalid data",
+                schema: 
r#"{"type":"record","name":"test","fields":[{"name":"f2","type":"long"}]}"#,
+                expected_error: Some("did not consume all bytes"),
+            },
+        ];
+        for test in tests {
+            let avro_schema = AvroSchema::new(test.schema.to_string());
+            let mut store = SchemaStore::new();
+            let fp = store.register(avro_schema.clone()).unwrap();
+            let prefix = make_prefix(fp);
+            let record_val = "some_string";
+            let mut body = prefix;
+            body.push((record_val.len() as u8) << 1);
+            body.extend_from_slice(record_val.as_bytes());
+            let decoder_res = ReaderBuilder::new()
+                .with_batch_size(1)
+                .with_writer_schema_store(store)
+                .with_active_fingerprint(fp)
+                .build_decoder();
+            let decoder = match decoder_res {
+                Ok(d) => d,
+                Err(e) => {
+                    if let Some(expected) = test.expected_error {
+                        assert!(
+                            e.to_string().contains(expected),
+                            "Test '{}' failed at build – expected 
'{expected}', got '{e}'",
+                            test.name
+                        );
+                        continue;
+                    } else {
+                        panic!("Test '{}' failed during build: {e}", 
test.name);
+                    }
+                }
+            };
+            let stream = Box::pin(stream::once(async { Bytes::from(body) }));
+            let decoded_stream = decode_stream(decoder, stream);
+            let batches_result: Result<Vec<RecordBatch>, ArrowError> =
+                block_on(decoded_stream.try_collect());
+            match (batches_result, test.expected_error) {
+                (Ok(batches), None) => {
+                    let batch =
+                        arrow::compute::concat_batches(&batches[0].schema(), 
&batches).unwrap();
+                    let expected_field = Field::new("f2", DataType::Utf8, 
false);
+                    let expected_schema = 
Arc::new(Schema::new(vec![expected_field]));
+                    let expected_array = 
Arc::new(StringArray::from(vec![record_val]));
+                    let expected_batch =
+                        RecordBatch::try_new(expected_schema, 
vec![expected_array]).unwrap();
+                    assert_eq!(batch, expected_batch, "Test '{}'", test.name);
+                }
+                (Err(e), Some(expected)) => {
+                    assert!(
+                        e.to_string().contains(expected),
+                        "Test '{}' – expected error containing '{expected}', 
got '{e}'",
+                        test.name
+                    );
+                }
+                (Ok(_), Some(expected)) => {
+                    panic!(
+                        "Test '{}' expected failure ('{expected}') but 
succeeded",
+                        test.name
+                    );
+                }
+                (Err(e), None) => {
+                    panic!("Test '{}' unexpectedly failed with '{e}'", 
test.name);
+                }
+            }
+        }
+    }
+
     #[test]
     fn test_utf8view_support() {
         let schema_json = r#"{
@@ -1128,89 +1393,6 @@ mod test {
         assert_eq!(batch, expected);
     }
 
-    #[test]
-    fn test_decode_stream_with_schema() {
-        struct TestCase<'a> {
-            name: &'a str,
-            schema: &'a str,
-            expected_error: Option<&'a str>,
-        }
-        let tests = vec![
-            TestCase {
-                name: "success",
-                schema: 
r#"{"type":"record","name":"test","fields":[{"name":"f2","type":"string"}]}"#,
-                expected_error: None,
-            },
-            TestCase {
-                name: "valid schema invalid data",
-                schema: 
r#"{"type":"record","name":"test","fields":[{"name":"f2","type":"long"}]}"#,
-                expected_error: Some("did not consume all bytes"),
-            },
-        ];
-        for test in tests {
-            let avro_schema = AvroSchema::new(test.schema.to_string());
-            let mut store = SchemaStore::new();
-            let fp = store.register(avro_schema.clone()).unwrap();
-            let prefix = make_prefix(fp);
-            let record_val = "some_string";
-            let mut body = prefix;
-            body.push((record_val.len() as u8) << 1);
-            body.extend_from_slice(record_val.as_bytes());
-            let decoder_res = ReaderBuilder::new()
-                .with_batch_size(1)
-                .with_writer_schema_store(store)
-                .with_active_fingerprint(fp)
-                .build_decoder();
-            let decoder = match decoder_res {
-                Ok(d) => d,
-                Err(e) => {
-                    if let Some(expected) = test.expected_error {
-                        assert!(
-                            e.to_string().contains(expected),
-                            "Test '{}' failed at build – expected 
'{expected}', got '{e}'",
-                            test.name
-                        );
-                        continue;
-                    } else {
-                        panic!("Test '{}' failed during build: {e}", 
test.name);
-                    }
-                }
-            };
-            let stream = Box::pin(stream::once(async { Bytes::from(body) }));
-            let decoded_stream = decode_stream(decoder, stream);
-            let batches_result: Result<Vec<RecordBatch>, ArrowError> =
-                block_on(decoded_stream.try_collect());
-            match (batches_result, test.expected_error) {
-                (Ok(batches), None) => {
-                    let batch =
-                        arrow::compute::concat_batches(&batches[0].schema(), 
&batches).unwrap();
-                    let expected_field = Field::new("f2", DataType::Utf8, 
false);
-                    let expected_schema = 
Arc::new(Schema::new(vec![expected_field]));
-                    let expected_array = 
Arc::new(StringArray::from(vec![record_val]));
-                    let expected_batch =
-                        RecordBatch::try_new(expected_schema, 
vec![expected_array]).unwrap();
-                    assert_eq!(batch, expected_batch, "Test '{}'", test.name);
-                }
-                (Err(e), Some(expected)) => {
-                    assert!(
-                        e.to_string().contains(expected),
-                        "Test '{}' – expected error containing '{expected}', 
got '{e}'",
-                        test.name
-                    );
-                }
-                (Ok(_), Some(expected)) => {
-                    panic!(
-                        "Test '{}' expected failure ('{expected}') but 
succeeded",
-                        test.name
-                    );
-                }
-                (Err(e), None) => {
-                    panic!("Test '{}' unexpectedly failed with '{e}'", 
test.name);
-                }
-            }
-        }
-    }
-
     #[test]
     fn test_decimal() {
         let files = [

Reply via email to