This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new c1033d1a68 feat: Implement an AsyncReader for avro using ObjectStore 
(#8930)
c1033d1a68 is described below

commit c1033d1a68c55de74429d0f67120dad35be53e2a
Author: Emily Matheys <[email protected]>
AuthorDate: Mon Feb 2 21:07:56 2026 +0200

    feat: Implement an AsyncReader for avro using ObjectStore (#8930)
    
    # Which issue does this PR close?
    
    - Closes #8929 .
    
    # Rationale for this change
    
    Allows for proper file splitting within an asynchronous context.
    
    # What changes are included in this PR?
    
    The raw implementation, allowing for file splitting, starting
    mid-block(read until sync marker is found), and further reading until
    end of block is found.
    This reader currently requires a reader_schema is provided if
    type-promotion, schema-evolution, or projection are desired.
    This is done so because https://github.com/apache/arrow-rs/issues/8928
    is currently blocking proper parsing from an ArrowSchema
    
    # Are these changes tested?
    
    Yes
    
    # Are there any user-facing changes?
    
    Only the addition.
    Other changes are internal to the crate (namely the way Decoder is
    created from parts)
---
 arrow-avro/Cargo.toml                              |   15 +-
 arrow-avro/README.md                               |   60 +-
 arrow-avro/src/lib.rs                              |   47 +
 .../src/reader/async_reader/async_file_reader.rs   |   95 ++
 arrow-avro/src/reader/async_reader/builder.rs      |  256 +++
 arrow-avro/src/reader/async_reader/mod.rs          | 1764 ++++++++++++++++++++
 arrow-avro/src/reader/async_reader/store.rs        |  107 ++
 arrow-avro/src/reader/block.rs                     |   13 +-
 arrow-avro/src/reader/mod.rs                       |   52 +-
 9 files changed, 2377 insertions(+), 32 deletions(-)

diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index a4699a2efa..5e5efd78d0 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -45,16 +45,26 @@ sha256 = ["dep:sha2"]
 small_decimals = []
 avro_custom_types = ["dep:arrow-select"]
 
+# Enable async APIs
+async = ["futures", "tokio"]
+# Enable object_store integration
+object_store = ["dep:object_store", "async"]
+
 [dependencies]
 arrow-schema = { workspace = true }
 arrow-buffer = { workspace = true }
 arrow-array = { workspace = true }
 arrow-select = { workspace = true, optional = true }
+
+object_store = { version = "0.13", default-features = false, optional = true }
+
+bytes = { version = "1.11.0", default-features = false, features = ["std"] }
 serde_json = { version = "1.0", default-features = false, features = ["std"] }
 serde = { version = "1.0.188", features = ["derive"] }
 flate2 = { version = "1.0", default-features = false, features = [
     "rust_backend",
 ], optional = true }
+futures = { version = "0.3", default-features = false, features = ["std"], 
optional = true }
 snap = { version = "1.0", default-features = false, optional = true }
 zstd = { version = "0.13", default-features = false, optional = true }
 bzip2 = { version = "0.6.0", optional = true }
@@ -66,10 +76,11 @@ indexmap = "2.10"
 rand = "0.9"
 md5 = { version = "0.8", optional = true }
 sha2 = { version = "0.10", optional = true }
-bytes = "1.11"
+tokio = { version = "1.0", optional = true, default-features = false, features 
= ["macros", "rt", "io-util"] }
 
 [dev-dependencies]
 arrow-data = { workspace = true }
+bytes = { version = "1.11.0", default-features = false, features = ["std"] }
 rand = { version = "0.9.1", default-features = false, features = [
     "std",
     "std_rng",
@@ -82,7 +93,9 @@ futures = "0.3.31"
 async-stream = "0.3.6"
 apache-avro = "0.21.0"
 num-bigint = "0.4"
+object_store = { version = "0.13", default-features = false, features = ["fs"] 
}
 once_cell = "1.21.3"
+tokio = { version = "1.0", default-features = false, features = ["macros", 
"rt-multi-thread", "io-util", "fs"] }
 
 [[bench]]
 name = "avro_reader"
diff --git a/arrow-avro/README.md b/arrow-avro/README.md
index 85fd760947..c5776c125b 100644
--- a/arrow-avro/README.md
+++ b/arrow-avro/README.md
@@ -51,7 +51,7 @@ Disable defaults and pick only what you need (see **Feature 
Flags**):
 
 ```toml
 [dependencies]
-arrow-avro = { version = "57.0.0", default-features = false, features = 
["deflate", "snappy"] }
+arrow-avro = { version = "58.0.0", default-features = false, features = 
["deflate", "snappy"] }
 ```
 
 ---
@@ -105,6 +105,36 @@ fn main() -> anyhow::Result<()> {
 
 See the crate docs for runnable SOE and Confluent round‑trip examples.
 
+### Async reading from object stores (`object_store` feature)
+
+```rust,ignore
+use std::sync::Arc;
+use arrow_avro::reader::{AsyncAvroFileReader, AvroObjectReader};
+use futures::TryStreamExt;
+use object_store::ObjectStore;
+use object_store::local::LocalFileSystem;
+use object_store::path::Path;
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+    let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+    let path = Path::from("data/example.avro");
+
+    let meta = store.head(&path).await?;
+    let reader = AvroObjectReader::new(store, path);
+
+    let stream = AsyncAvroFileReader::builder(reader, meta.size, 1024)
+        .try_build()
+        .await?;
+
+    let batches: Vec<_> = stream.try_collect().await?;
+    for batch in batches {
+        println!("rows: {}", batch.num_rows());
+    }
+    Ok(())
+}
+```
+
 ---
 
 ## Feature Flags (what they do and when to use them)
@@ -128,15 +158,22 @@ See the crate docs for runnable SOE and Confluent 
round‑trip examples.
 * Only **OCF** uses these codecs (they compress per‑block). They do **not** 
apply to raw Avro frames used by Confluent wire format or SOE. The crate’s 
`compression` module is specifically for **OCF blocks**.
 * `deflate` uses `flate2` with the `rust_backend` (no system zlib required).
 
+### Async & Object Store
+
+| Feature        | Default | What it enables                                   
                          | When to use                                         
                          |
+|----------------|--------:|-----------------------------------------------------------------------------|-------------------------------------------------------------------------------|
+| `async`        |       ⬜ | Async APIs for reading Avro via `futures` and 
`tokio`                       | Enable for non-blocking async Avro reading with 
`AsyncAvroFileReader`.        |
+| `object_store` |       ⬜ | Integration with `object_store` crate (implies 
`async`)                     | Enable for reading Avro from cloud storage (S3, 
GCS, Azure Blob, etc.).       |
+
 ### Schema fingerprints & custom logical type helpers
 
-| Feature                     | Default | What it enables                      
                                            | When to use                       
                                                                                
  |    
+| Feature                     | Default | What it enables                      
                                            | When to use                       
                                                                                
  |
 
|-----------------------------|--------:|----------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|
-| `md5`                       |       ⬜ | `md5` dep for optional **MD5** 
schema fingerprints                               | If you want to compute MD5 
fingerprints of writer schemas (i.e. for custom prefixing/validation).          
         |   
-| `sha256`                    |       ⬜ | `sha2` dep for optional **SHA‑256** 
schema fingerprints                          | If you prefer longer 
fingerprints; affects max prefix length (i.e. when framing).                    
               |  
+| `md5`                       |       ⬜ | `md5` dep for optional **MD5** 
schema fingerprints                               | If you want to compute MD5 
fingerprints of writer schemas (i.e. for custom prefixing/validation).          
         |
+| `sha256`                    |       ⬜ | `sha2` dep for optional **SHA‑256** 
schema fingerprints                          | If you prefer longer 
fingerprints; affects max prefix length (i.e. when framing).                    
               |
 | `small_decimals`            |       ⬜ | Extra handling for **small decimal** 
logical types (`Decimal32` and `Decimal64`) | If your Avro `decimal` values are 
small and you want more compact Arrow representations.                          
  |
-| `avro_custom_types`         |       ⬜ | Annotates Avro values using Arrow 
specific custom logical types                  | Enable when you need 
arrow-avro to reinterpret certain Avro fields as Arrow types that Avro doesn’t 
natively model. | 
-| `canonical_extension_types` |       ⬜ | Re‑exports Arrow’s canonical 
extension types support from `arrow-schema`         | Enable if your workflow 
uses Arrow [canonical extension types] and you want `arrow-avro` to respect 
them.           | 
+| `avro_custom_types`         |       ⬜ | Annotates Avro values using Arrow 
specific custom logical types                  | Enable when you need 
arrow-avro to reinterpret certain Avro fields as Arrow types that Avro doesn't 
natively model. |
+| `canonical_extension_types` |       ⬜ | Re‑exports Arrow's canonical 
extension types support from `arrow-schema`         | Enable if your workflow 
uses Arrow [canonical extension types] and you want `arrow-avro` to respect 
them.           | 
 
 [canonical extension types]: 
https://arrow.apache.org/docs/format/CanonicalExtensions.html
 
@@ -149,17 +186,22 @@ See the crate docs for runnable SOE and Confluent 
round‑trip examples.
 * Minimal, fast build (common pipelines):
 
   ```toml
-  arrow-avro = { version = "56", default-features = false, features = 
["deflate", "snappy"] }
+  arrow-avro = { version = "58", default-features = false, features = 
["deflate", "snappy"] }
   ```
 * Include Zstandard too (modern data lakes):
 
   ```toml
-  arrow-avro = { version = "56", default-features = false, features = 
["deflate", "snappy", "zstd"] }
+  arrow-avro = { version = "58", default-features = false, features = 
["deflate", "snappy", "zstd"] }
+  ```
+* Async reading from object stores (S3, GCS, etc.):
+
+  ```toml
+  arrow-avro = { version = "58", features = ["object_store"] }
   ```
 * Fingerprint helpers:
 
   ```toml
-  arrow-avro = { version = "56", features = ["md5", "sha256"] }
+  arrow-avro = { version = "58", features = ["md5", "sha256"] }
   ```
   
 ---
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
index eb04ee8fa6..da451ea146 100644
--- a/arrow-avro/src/lib.rs
+++ b/arrow-avro/src/lib.rs
@@ -123,16 +123,56 @@
 //! # Ok(()) }
 //! ```
 //!
+//! ## `async` Reading (`async` feature)
+//!
+//! The [`reader`] module provides async APIs for reading Avro files when the 
`async`
+//! feature is enabled.
+//!
+//! [`AsyncAvroFileReader`] implements `Stream<Item = Result<RecordBatch, 
ArrowError>>`,
+//! allowing efficient async streaming of record batches. When the 
`object_store` feature
+//! is enabled, [`AvroObjectReader`] provides integration with object storage 
services
+//! such as S3 via the [object_store] crate.
+//!
+//! ```ignore
+//! use std::sync::Arc;
+//! use arrow_avro::reader::{AsyncAvroFileReader, AvroObjectReader};
+//! use futures::TryStreamExt;
+//! use object_store::ObjectStore;
+//! use object_store::local::LocalFileSystem;
+//! use object_store::path::Path;
+//!
+//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
+//! let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+//! let path = Path::from("data/example.avro");
+//! let meta = store.head(&path).await?;
+//!
+//! let reader = AvroObjectReader::new(store, path);
+//! let stream = AsyncAvroFileReader::builder(reader, meta.size, 1024)
+//!     .try_build()
+//!     .await?;
+//!
+//! let batches: Vec<_> = stream.try_collect().await?;
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! [object_store]: https://docs.rs/object_store/latest/object_store/
+//!
 //! ---
 //!
 //! ### Modules
 //!
 //! - [`reader`]: read Avro (OCF, SOE, Confluent) into Arrow `RecordBatch`es.
+//!   - With the `async` feature: [`AsyncAvroFileReader`] for async streaming 
reads.
+//!   - With the `object_store` feature: [`AvroObjectReader`] for reading from 
cloud storage.
 //! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent, 
Apicurio).
 //! - [`schema`]: Avro schema parsing / fingerprints / registries.
 //! - [`compression`]: codecs used for **OCF block compression** (i.e., 
Deflate, Snappy, Zstandard, BZip2, and XZ).
 //! - [`codec`]: internal Avro-Arrow type conversion and row decode/encode 
plans.
 //!
+//! [`AsyncAvroFileReader`]: reader::AsyncAvroFileReader
+//! [`AvroObjectReader`]: reader::AvroObjectReader
+//!
 //! ### Features
 //!
 //! **OCF compression (enabled by default)**
@@ -142,6 +182,11 @@
 //! - `bzip2` — enable BZip2 block compression.
 //! - `xz` — enable XZ/LZMA block compression.
 //!
+//! **Async & Object Store (opt‑in)**
+//! - `async` — enable async APIs for reading Avro (`AsyncAvroFileReader`, 
`AsyncFileReader` trait).
+//! - `object_store` — enable integration with the [`object_store`] crate for 
reading Avro
+//!   from cloud storage (S3, GCS, Azure Blob, etc.) via `AvroObjectReader`. 
Implies `async`.
+//!
 //! **Schema fingerprints & helpers (opt‑in)**
 //! - `md5` — enable MD5 writer‑schema fingerprints.
 //! - `sha256` — enable SHA‑256 writer‑schema fingerprints.
@@ -156,6 +201,8 @@
 //! - OCF compression codecs apply only to **Object Container Files**; they do 
not affect Avro
 //!   single object encodings.
 //!
+//! [`object_store`]: https://docs.rs/object_store/latest/object_store/
+//!
 //! [canonical extension types]: 
https://arrow.apache.org/docs/format/CanonicalExtensions.html
 //!
 //! [Apache Arrow]: https://arrow.apache.org/
diff --git a/arrow-avro/src/reader/async_reader/async_file_reader.rs 
b/arrow-avro/src/reader/async_reader/async_file_reader.rs
new file mode 100644
index 0000000000..1257a2f3dd
--- /dev/null
+++ b/arrow-avro/src/reader/async_reader/async_file_reader.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::AvroError;
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::future::BoxFuture;
+use std::io::SeekFrom;
+use std::ops::Range;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+/// The asynchronous interface used by [`super::AsyncAvroFileReader`] to read 
avro files
+///
+/// Notes:
+///
+/// 1. There is a default implementation for types that implement [`AsyncRead`]
+///    and [`AsyncSeek`], for example [`tokio::fs::File`].
+///
+/// 2. [`super::AvroObjectReader`], available when the `object_store` crate 
feature
+///    is enabled, implements this interface for [`ObjectStore`].
+///
+/// [`ObjectStore`]: object_store::ObjectStore
+///
+/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
+pub trait AsyncFileReader: Send {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, 
AvroError>>;
+
+    /// Retrieve multiple byte ranges. The default implementation will call 
`get_bytes` sequentially
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+    ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
+        async move {
+            let mut result = Vec::with_capacity(ranges.len());
+
+            for range in ranges.into_iter() {
+                let data = self.get_bytes(range).await?;
+                result.push(data);
+            }
+
+            Ok(result)
+        }
+        .boxed()
+    }
+}
+
+/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
+impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
+    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, 
AvroError>> {
+        self.as_mut().get_bytes(range)
+    }
+
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+    ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
+        self.as_mut().get_byte_ranges(ranges)
+    }
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
+    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, 
AvroError>> {
+        async move {
+            self.seek(SeekFrom::Start(range.start)).await?;
+
+            let to_read = range.end - range.start;
+            let mut buffer = Vec::with_capacity(to_read as usize);
+            let read = self.take(to_read).read_to_end(&mut buffer).await?;
+            if read as u64 != to_read {
+                return Err(AvroError::EOF(format!(
+                    "expected to read {} bytes, got {}",
+                    to_read, read
+                )));
+            }
+
+            Ok(buffer.into())
+        }
+        .boxed()
+    }
+}
diff --git a/arrow-avro/src/reader/async_reader/builder.rs 
b/arrow-avro/src/reader/async_reader/builder.rs
new file mode 100644
index 0000000000..0f9a7abf1c
--- /dev/null
+++ b/arrow-avro/src/reader/async_reader/builder.rs
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::codec::AvroFieldBuilder;
+use crate::errors::AvroError;
+use crate::reader::async_reader::ReaderState;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
+use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY};
+use indexmap::IndexMap;
+use std::ops::Range;
+
+const DEFAULT_HEADER_SIZE_HINT: u64 = 16 * 1024; // 16 KB
+
+/// Builder for an asynchronous Avro file reader.
+pub struct ReaderBuilder<R> {
+    reader: R,
+    file_size: u64,
+    batch_size: usize,
+    range: Option<Range<u64>>,
+    reader_schema: Option<AvroSchema>,
+    projection: Option<Vec<usize>>,
+    header_size_hint: Option<u64>,
+    utf8_view: bool,
+    strict_mode: bool,
+}
+
+impl<R> ReaderBuilder<R> {
+    pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
+        Self {
+            reader,
+            file_size,
+            batch_size,
+            range: None,
+            reader_schema: None,
+            projection: None,
+            header_size_hint: None,
+            utf8_view: false,
+            strict_mode: false,
+        }
+    }
+
+    /// Specify a byte range to read from the Avro file.
+    /// If this is provided, the reader will read all the blocks within the 
specified range,
+    /// if the range ends mid-block, it will attempt to fetch the remaining 
bytes to complete the block,
+    /// but no further blocks will be read.
+    /// If this is omitted, the full file will be read.
+    pub fn with_range(self, range: Range<u64>) -> Self {
+        Self {
+            range: Some(range),
+            ..self
+        }
+    }
+
+    /// Specify a reader schema to use when reading the Avro file.
+    /// This can be useful to project specific columns or handle schema 
evolution.
+    /// If this is not provided, the schema will be derived from the Arrow 
schema provided.
+    pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
+        Self {
+            reader_schema: Some(reader_schema),
+            ..self
+        }
+    }
+
+    /// Specify a projection of column indices to read from the Avro file.
+    /// This can help optimize reading by only fetching the necessary columns.
+    pub fn with_projection(self, projection: Vec<usize>) -> Self {
+        Self {
+            projection: Some(projection),
+            ..self
+        }
+    }
+
+    /// Provide a hint for the expected size of the Avro header in bytes.
+    /// This can help optimize the initial read operation when fetching the 
header.
+    pub fn with_header_size_hint(self, hint: u64) -> Self {
+        Self {
+            header_size_hint: Some(hint),
+            ..self
+        }
+    }
+
+    /// Enable usage of Utf8View types when reading string data.
+    pub fn with_utf8_view(self, utf8_view: bool) -> Self {
+        Self { utf8_view, ..self }
+    }
+
+    /// Enable strict mode for schema validation and data reading.
+    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
+        Self {
+            strict_mode,
+            ..self
+        }
+    }
+}
+
+impl<R: AsyncFileReader> ReaderBuilder<R> {
+    async fn read_header(&mut self) -> Result<(Header, u64), AvroError> {
+        let mut decoder = HeaderDecoder::default();
+        let mut position = 0;
+        loop {
+            let range_to_fetch = position
+                ..(position + 
self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
+                    .min(self.file_size);
+
+            // Maybe EOF after the header, no actual data
+            if range_to_fetch.is_empty() {
+                break;
+            }
+
+            let current_data = self
+                .reader
+                .get_bytes(range_to_fetch.clone())
+                .await
+                .map_err(|err| {
+                    AvroError::General(format!(
+                        "Error fetching Avro header from file reader: {err}"
+                    ))
+                })?;
+            if current_data.is_empty() {
+                return Err(AvroError::EOF(
+                    "Unexpected EOF while fetching header data".into(),
+                ));
+            }
+
+            let read = current_data.len();
+            let decoded = decoder.decode(&current_data)?;
+            if decoded != read {
+                position += decoded as u64;
+                break;
+            }
+            position += read as u64;
+        }
+
+        decoder
+            .flush()
+            .map(|header| (header, position))
+            .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro 
header".into()))
+    }
+
+    /// Build the asynchronous Avro reader with the provided parameters.
+    /// This reads the header first to initialize the reader state.
+    pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, 
AvroError> {
+        if self.file_size == 0 {
+            return Err(AvroError::InvalidArgument("File size cannot be 
0".into()));
+        }
+
+        // Start by reading the header from the beginning of the avro file
+        // take the writer schema from the header
+        let (header, header_len) = self.read_header().await?;
+        let writer_schema = {
+            let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
+                AvroError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            let json_string = std::str::from_utf8(raw)
+                .map_err(|e| {
+                    AvroError::ParseError(format!("Invalid UTF-8 in Avro 
schema header: {e}"))
+                })?
+                .to_string();
+            AvroSchema::new(json_string)
+        };
+
+        // If projection exists, project the reader schema,
+        // if no reader schema is provided, parse it from the header(get the 
raw writer schema), and project that
+        // this projected schema will be the schema used for reading.
+        let projected_reader_schema = self
+            .projection
+            .as_deref()
+            .map(|projection| {
+                let base_schema = if let Some(reader_schema) = 
&self.reader_schema {
+                    reader_schema
+                } else {
+                    &writer_schema
+                };
+                base_schema.project(projection)
+            })
+            .transpose()?;
+
+        // Use either the projected reader schema or the original reader 
schema(if no projection)
+        // (both optional, at worst no reader schema is provided, in which 
case we read with the writer schema)
+        let effective_reader_schema = projected_reader_schema
+            .as_ref()
+            .or(self.reader_schema.as_ref())
+            .map(|s| s.schema())
+            .transpose()?;
+
+        let root = {
+            let writer_schema = writer_schema.schema()?;
+            let mut builder = AvroFieldBuilder::new(&writer_schema);
+            if let Some(reader_schema) = &effective_reader_schema {
+                builder = builder.with_reader_schema(reader_schema);
+            }
+            builder
+                .with_utf8view(self.utf8_view)
+                .with_strict_mode(self.strict_mode)
+                .build()
+        }?;
+
+        let record_decoder = 
RecordDecoder::try_new_with_options(root.data_type())?;
+        let decoder = Decoder::from_parts(
+            self.batch_size,
+            record_decoder,
+            None,
+            IndexMap::new(),
+            FingerprintAlgorithm::Rabin,
+        );
+        let range = match self.range {
+            Some(r) => {
+                // If this PartitionedFile's range starts at 0, we need to 
skip the header bytes.
+                // But then we need to seek back 16 bytes to include the sync 
marker for the first block,
+                // as the logic in this reader searches the data for the first 
sync marker(after which a block starts),
+                // then reads blocks from the count, size etc.
+                let start = 
r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header 
length overflow, header was not long enough to contain avro 
bytes".to_string()))?);
+                let end = r.end.max(start).min(self.file_size); // Ensure end 
is not less than start, worst case range is empty
+                start..end
+            }
+            None => 0..self.file_size,
+        };
+
+        // Determine if there is actually data to fetch, note that we subtract 
the header len from range.start,
+        // so we need to check if range.end == header_len to see if there's no 
data after the header
+        let reader_state = if range.start == range.end || header_len == 
range.end {
+            ReaderState::Finished
+        } else {
+            ReaderState::Idle {
+                reader: self.reader,
+            }
+        };
+        let codec = header.compression()?;
+        let sync_marker = header.sync();
+
+        Ok(AsyncAvroFileReader::new(
+            range,
+            self.file_size,
+            decoder,
+            codec,
+            sync_marker,
+            reader_state,
+        ))
+    }
+}
diff --git a/arrow-avro/src/reader/async_reader/mod.rs 
b/arrow-avro/src/reader/async_reader/mod.rs
new file mode 100644
index 0000000000..53229f8576
--- /dev/null
+++ b/arrow-avro/src/reader/async_reader/mod.rs
@@ -0,0 +1,1764 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::{BlockDecoder, BlockDecoderState};
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::ReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+use crate::errors::AvroError;
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum FetchNextBehaviour {
+    /// Initial read: scan for sync marker, then move to decoding blocks
+    ReadSyncMarker,
+    /// Parse VLQ header bytes one at a time until Data state, then continue 
decoding
+    DecodeVLQHeader,
+    /// Continue decoding the current block with the fetched data
+    ContinueDecoding,
+}
+
+enum ReaderState<R> {
+    /// Intermediate state to fix ownership issues
+    InvalidState,
+    /// Initial state, fetch initial range
+    Idle { reader: R },
+    /// Fetching data from the reader
+    FetchingData {
+        future: BoxFuture<'static, Result<(R, Bytes), AvroError>>,
+        next_behaviour: FetchNextBehaviour,
+    },
+    /// Decode a block in a loop until completion
+    DecodingBlock { data: Bytes, reader: R },
+    /// Output batches from a decoded block
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    /// Successfully finished reading file contents; drain any remaining 
buffered records
+    /// from the decoder into (possibly partial) output batches.
+    Flushing,
+    /// Done, flush decoder and return
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+///
+/// # Example
+///
+/// ```
+/// #[tokio::main(flavor = "current_thread")]
+/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// use std::io::Cursor;
+/// use std::sync::Arc;
+/// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+/// use arrow_schema::{DataType, Field, Schema};
+/// use arrow_avro::reader::AsyncAvroFileReader;
+/// use arrow_avro::writer::AvroWriter;
+/// use futures::TryStreamExt;
+///
+/// // Build a minimal Arrow schema and batch
+/// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
+/// let batch = RecordBatch::try_new(
+///     Arc::new(schema.clone()),
+///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+/// )?;
+///
+/// // Write an Avro OCF to memory
+/// let buffer: Vec<u8> = Vec::new();
+/// let mut writer = AvroWriter::new(buffer, schema)?;
+/// writer.write(&batch)?;
+/// writer.finish()?;
+/// let bytes = writer.into_inner();
+///
+/// // Create an async reader from the in-memory bytes
+/// // `tokio::fs::File` also implements `AsyncFileReader` for reading from 
disk
+/// let file_size = bytes.len();
+/// let cursor = Cursor::new(bytes);
+/// let reader = AsyncAvroFileReader::builder(cursor, file_size as u64, 1024)
+///     .try_build()
+///     .await?;
+///
+/// // Consume the stream of RecordBatches
+/// let batches: Vec<RecordBatch> = reader.try_collect().await?;
+/// assert_eq!(batches.len(), 1);
+/// assert_eq!(batches[0].num_rows(), 3);
+/// Ok(())
+/// }
+/// ```
+pub struct AsyncAvroFileReader<R> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
ReaderBuilder<R> {
+        ReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    /// Calculate the byte range needed to complete the current block.
+    /// Only valid when block_decoder is in Data or Sync state.
+    /// Returns the range to fetch, or an error if EOF would be reached.
+    fn remaining_block_range(&self) -> Result<Range<u64>, AvroError> {
+        let remaining = self.block_decoder.bytes_remaining() as u64
+            + match self.block_decoder.state() {
+                BlockDecoderState::Data => 16, // Include sync marker
+                BlockDecoderState::Sync => 0,
+                state => {
+                    return Err(AvroError::General(format!(
+                        "remaining_block_range called in unexpected state: 
{state:?}"
+                    )));
+                }
+            };
+
+        let fetch_end = self.range.end + remaining;
+        if fetch_end > self.file_size {
+            return Err(AvroError::EOF(
+                "Avro block requires more bytes than what exists in the 
file".into(),
+            ));
+        }
+
+        Ok(self.range.end..fetch_end)
+    }
+
+    /// Terminate the stream after returning this error once.
+    #[inline]
+    fn finish_with_error(
+        &mut self,
+        error: AvroError,
+    ) -> Poll<Option<Result<RecordBatch, AvroError>>> {
+        self.reader_state = ReaderState::Finished;
+        Poll::Ready(Some(Err(error)))
+    }
+
+    #[inline]
+    fn start_flushing(&mut self) {
+        self.reader_state = ReaderState::Flushing;
+    }
+
+    /// Drain any remaining buffered records from the decoder.
+    #[inline]
+    fn poll_flush(&mut self) -> Poll<Option<Result<RecordBatch, AvroError>>> {
+        match self.decoder.flush() {
+            Ok(Some(batch)) => {
+                self.reader_state = ReaderState::Flushing;
+                Poll::Ready(Some(Ok(batch)))
+            }
+            Ok(None) => {
+                self.reader_state = ReaderState::Finished;
+                Poll::Ready(None)
+            }
+            Err(e) => self.finish_with_error(e),
+        }
+    }
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    // The forbid question mark thing shouldn't apply here, as it is within 
the future,
+    // so exported this to a separate function.
+    async fn fetch_bytes(mut reader: R, range: Range<u64>) -> Result<(R, 
Bytes), AvroError> {
+        let data = reader.get_bytes(range).await?;
+        Ok((reader, data))
+    }
+
+    #[forbid(clippy::question_mark_used)]
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, AvroError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, 
ReaderState::InvalidState) {
+                ReaderState::Idle { reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end {
+                        return 
self.finish_with_error(AvroError::InvalidArgument(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )));
+                    }
+
+                    let future = Self::fetch_bytes(reader, range).boxed();
+                    self.reader_state = ReaderState::FetchingData {
+                        future,
+                        next_behaviour: FetchNextBehaviour::ReadSyncMarker,
+                    };
+                }
+                ReaderState::FetchingData {
+                    mut future,
+                    next_behaviour,
+                } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => return 
self.finish_with_error(e),
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FetchingData {
+                                future,
+                                next_behaviour,
+                            };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    match next_behaviour {
+                        FetchNextBehaviour::ReadSyncMarker => {
+                            let sync_marker_pos = data_chunk
+                                .windows(16)
+                                .position(|slice| slice == self.sync_marker);
+                            let block_start = match sync_marker_pos {
+                                Some(pos) => pos + 16, // Move past the sync 
marker
+                                None => {
+                                    // Sync marker not found, valid if we 
arbitrarily split the file at its end.
+                                    self.reader_state = ReaderState::Finished;
+                                    return Poll::Ready(None);
+                                }
+                            };
+
+                            self.reader_state = ReaderState::DecodingBlock {
+                                reader,
+                                data: data_chunk.slice(block_start..),
+                            };
+                        }
+                        FetchNextBehaviour::DecodeVLQHeader => {
+                            let mut data = data_chunk;
+
+                            // Feed bytes one at a time until we reach Data 
state (VLQ header complete)
+                            while !matches!(self.block_decoder.state(), 
BlockDecoderState::Data) {
+                                if data.is_empty() {
+                                    return 
self.finish_with_error(AvroError::EOF(
+                                        "Unexpected EOF while reading Avro 
block header".into(),
+                                    ));
+                                }
+                                let consumed = match 
self.block_decoder.decode(&data[..1]) {
+                                    Ok(consumed) => consumed,
+                                    Err(e) => return self.finish_with_error(e),
+                                };
+                                if consumed == 0 {
+                                    return 
self.finish_with_error(AvroError::General(
+                                        "BlockDecoder failed to consume byte 
during VLQ header parsing"
+                                            .into(),
+                                    ));
+                                }
+                                data = data.slice(consumed..);
+                            }
+
+                            // Now we know the block size. Slice remaining 
data to what we need.
+                            let bytes_remaining = 
self.block_decoder.bytes_remaining();
+                            let data_to_use = 
data.slice(..data.len().min(bytes_remaining));
+                            let consumed = match 
self.block_decoder.decode(&data_to_use) {
+                                Ok(consumed) => consumed,
+                                Err(e) => return self.finish_with_error(e),
+                            };
+                            if consumed != data_to_use.len() {
+                                return 
self.finish_with_error(AvroError::General(
+                                    "BlockDecoder failed to consume all bytes 
after VLQ header parsing"
+                                        .into(),
+                                ));
+                            }
+
+                            // May need more data to finish the block.
+                            let range_to_fetch = match 
self.remaining_block_range() {
+                                Ok(range) if range.is_empty() => {
+                                    // All bytes fetched, move to decoding 
block directly
+                                    self.reader_state = 
ReaderState::DecodingBlock {
+                                        reader,
+                                        data: Bytes::new(),
+                                    };
+                                    continue;
+                                }
+                                Ok(range) => range,
+                                Err(e) => return self.finish_with_error(e),
+                            };
+
+                            let future = Self::fetch_bytes(reader, 
range_to_fetch).boxed();
+                            self.reader_state = ReaderState::FetchingData {
+                                future,
+                                next_behaviour: 
FetchNextBehaviour::ContinueDecoding,
+                            };
+                            continue;
+                        }
+                        FetchNextBehaviour::ContinueDecoding => {
+                            self.reader_state = ReaderState::DecodingBlock {
+                                reader,
+                                data: data_chunk,
+                            };
+                        }
+                    }
+                }
+                ReaderState::InvalidState => {
+                    return self.finish_with_error(AvroError::General(
+                        "AsyncAvroFileReader in invalid state".into(),
+                    ));
+                }
+                ReaderState::DecodingBlock { reader, mut data } => {
+                    // Try to decode another block from the buffered reader.
+                    let consumed = match self.block_decoder.decode(&data) {
+                        Ok(consumed) => consumed,
+                        Err(e) => return self.finish_with_error(e),
+                    };
+                    data = data.slice(consumed..);
+
+                    // If we reached the end of the block, flush it, and move 
to read batches.
+                    if let Some(block) = self.block_decoder.flush() {
+                        // Successfully decoded a block.
+                        let block_count = block.count;
+                        let block_data = Bytes::from_owner(if let Some(ref 
codec) = self.codec {
+                            match codec.decompress(&block.data) {
+                                Ok(decompressed) => decompressed,
+                                Err(e) => return self.finish_with_error(e),
+                            }
+                        } else {
+                            block.data
+                        });
+
+                        // Since we have an active block, move to reading 
batches
+                        self.reader_state = ReaderState::ReadingBatches {
+                            reader,
+                            data,
+                            block_data,
+                            remaining_in_block: block_count,
+                        };
+                        continue;
+                    }
+
+                    // data should always be consumed unless Finished, if it 
wasn't, something went wrong
+                    if !data.is_empty() {
+                        return self.finish_with_error(AvroError::General(
+                            "BlockDecoder failed to make progress decoding 
Avro block".into(),
+                        ));
+                    }
+
+                    if matches!(self.block_decoder.state(), 
BlockDecoderState::Finished) {
+                        // We've already flushed, so if no batch was produced, 
we are simply done.
+                        self.finishing_partial_block = false;
+                        self.start_flushing();
+                        continue;
+                    }
+
+                    // If we've tried the following stage before, and still 
can't decode,
+                    // this means the file is truncated or corrupted.
+                    if self.finishing_partial_block {
+                        return self.finish_with_error(AvroError::EOF(
+                            "Unexpected EOF while reading last Avro 
block".into(),
+                        ));
+                    }
+
+                    // Avro splitting case: block is incomplete, we need to:
+                    // 1. Parse the length so we know how much to read
+                    // 2. Fetch more data from the reader
+                    // 3. Create a new block data from the remaining slice and 
the newly fetched data
+                    // 4. Continue decoding until end of block
+                    self.finishing_partial_block = true;
+
+                    // Mid-block, but we don't know how many bytes are missing 
yet
+                    if matches!(
+                        self.block_decoder.state(),
+                        BlockDecoderState::Count | BlockDecoderState::Size
+                    ) {
+                        // Max VLQ header is 20 bytes (10 bytes each for count 
and size).
+                        // Fetch just enough to complete it.
+                        const MAX_VLQ_HEADER_SIZE: u64 = 20;
+                        let fetch_end = (self.range.end + 
MAX_VLQ_HEADER_SIZE).min(self.file_size);
+
+                        // If there is nothing more to fetch, error out
+                        if fetch_end == self.range.end {
+                            return self.finish_with_error(AvroError::EOF(
+                                "Unexpected EOF while reading Avro block 
header".into(),
+                            ));
+                        }
+
+                        let range_to_fetch = self.range.end..fetch_end;
+                        self.range.end = fetch_end; // Track that we've 
fetched these bytes
+
+                        let future = Self::fetch_bytes(reader, 
range_to_fetch).boxed();
+                        self.reader_state = ReaderState::FetchingData {
+                            future,
+                            next_behaviour: 
FetchNextBehaviour::DecodeVLQHeader,
+                        };
+                        continue;
+                    }
+
+                    // Otherwise, we're mid-block but know how many bytes are 
remaining to fetch.
+                    let range_to_fetch = match self.remaining_block_range() {
+                        Ok(range) => range,
+                        Err(e) => return self.finish_with_error(e),
+                    };
+
+                    let future = Self::fetch_bytes(reader, 
range_to_fetch).boxed();
+                    self.reader_state = ReaderState::FetchingData {
+                        future,
+                        next_behaviour: FetchNextBehaviour::ContinueDecoding,
+                    };
+                    continue;
+                }
+                ReaderState::ReadingBatches {
+                    reader,
+                    data,
+                    mut block_data,
+                    mut remaining_in_block,
+                } => {
+                    let (consumed, records_decoded) =
+                        match self.decoder.decode_block(&block_data, 
remaining_in_block) {
+                            Ok((consumed, records_decoded)) => (consumed, 
records_decoded),
+                            Err(e) => return self.finish_with_error(e),
+                        };
+
+                    remaining_in_block -= records_decoded;
+
+                    if remaining_in_block == 0 {
+                        if data.is_empty() {
+                            // No more data to read, drain remaining buffered 
records
+                            self.start_flushing();
+                        } else {
+                            // Finished this block, move to decode next block 
in the next iteration
+                            self.reader_state = ReaderState::DecodingBlock { 
reader, data };
+                        }
+                    } else {
+                        // Still more records to decode in this block, slice 
the already-read data and stay in this state
+                        block_data = block_data.slice(consumed..);
+                        self.reader_state = ReaderState::ReadingBatches {
+                            reader,
+                            data,
+                            block_data,
+                            remaining_in_block,
+                        };
+                    }
+
+                    // We have a full batch ready, emit it
+                    // (This is not mutually exclusive with the block being 
finished, so the state change is valid)
+                    if self.decoder.batch_is_full() {
+                        return match self.decoder.flush() {
+                            Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
+                            Ok(None) => 
self.finish_with_error(AvroError::General(
+                                "Decoder reported a full batch, but flush 
returned None".into(),
+                            )),
+                            Err(e) => self.finish_with_error(e),
+                        };
+                    }
+                }
+                ReaderState::Flushing => {
+                    return self.poll_flush();
+                }
+                ReaderState::Finished => {
+                    // Terminal: once finished (including after an error), 
always yield None
+                    self.reader_state = ReaderState::Finished;
+                    return Poll::Ready(None);
+                }
+            }
+        }
+    }
+}
+
+// To maintain compatibility with the expected stream results in the 
ecosystem, this returns ArrowError.
+impl<R: AsyncFileReader + Unpin + 'static> Stream for AsyncAvroFileReader<R> {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        self.read_next(cx).map_err(Into::into)
+    }
+}
+
+#[cfg(all(test, feature = "object_store"))]
+mod tests {
+    use super::*;
+    use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
+    use arrow_array::cast::AsArray;
+    use arrow_array::types::{Int32Type, Int64Type};
+    use arrow_array::*;
+    use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
+    use futures::{StreamExt, TryStreamExt};
+    use object_store::local::LocalFileSystem;
+    use object_store::path::Path;
+    use object_store::{ObjectStore, ObjectStoreExt};
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
+    fn arrow_test_data(file: &str) -> String {
+        let base =
+            std::env::var("ARROW_TEST_DATA").unwrap_or_else(|_| 
"../testing/data".to_string());
+        format!("{}/{}", base, file)
+    }
+
+    fn get_alltypes_schema() -> SchemaRef {
+        let schema = Schema::new(vec![
+            Field::new("id", DataType::Int32, true),
+            Field::new("bool_col", DataType::Boolean, true),
+            Field::new("tinyint_col", DataType::Int32, true),
+            Field::new("smallint_col", DataType::Int32, true),
+            Field::new("int_col", DataType::Int32, true),
+            Field::new("bigint_col", DataType::Int64, true),
+            Field::new("float_col", DataType::Float32, true),
+            Field::new("double_col", DataType::Float64, true),
+            Field::new("date_string_col", DataType::Binary, true),
+            Field::new("string_col", DataType::Binary, true),
+            Field::new(
+                "timestamp_col",
+                DataType::Timestamp(TimeUnit::Microsecond, 
Some("+00:00".into())),
+                true,
+            ),
+        ])
+        .with_metadata(HashMap::from([(
+            SCHEMA_METADATA_KEY.into(),
+            r#"{
+    "type": "record",
+    "name": "topLevelRecord",
+    "fields": [
+        {
+            "name": "id",
+            "type": [
+                "int",
+                "null"
+            ]
+        },
+        {
+            "name": "bool_col",
+            "type": [
+                "boolean",
+                "null"
+            ]
+        },
+        {
+            "name": "tinyint_col",
+            "type": [
+                "int",
+                "null"
+            ]
+        },
+        {
+            "name": "smallint_col",
+            "type": [
+                "int",
+                "null"
+            ]
+        },
+        {
+            "name": "int_col",
+            "type": [
+                "int",
+                "null"
+            ]
+        },
+        {
+            "name": "bigint_col",
+            "type": [
+                "long",
+                "null"
+            ]
+        },
+        {
+            "name": "float_col",
+            "type": [
+                "float",
+                "null"
+            ]
+        },
+        {
+            "name": "double_col",
+            "type": [
+                "double",
+                "null"
+            ]
+        },
+        {
+            "name": "date_string_col",
+            "type": [
+                "bytes",
+                "null"
+            ]
+        },
+        {
+            "name": "string_col",
+            "type": [
+                "bytes",
+                "null"
+            ]
+        },
+        {
+            "name": "timestamp_col",
+            "type": [
+                {
+                    "type": "long",
+                    "logicalType": "timestamp-micros"
+                },
+                "null"
+            ]
+        }
+    ]
+}
+"#
+            .into(),
+        )]));
+        Arc::new(schema)
+    }
+
+    fn get_alltypes_with_nulls_schema() -> SchemaRef {
+        let schema = Schema::new(vec![
+            Field::new("string_col", DataType::Binary, true),
+            Field::new("int_col", DataType::Int32, true),
+            Field::new("bool_col", DataType::Boolean, true),
+            Field::new("bigint_col", DataType::Int64, true),
+            Field::new("float_col", DataType::Float32, true),
+            Field::new("double_col", DataType::Float64, true),
+            Field::new("bytes_col", DataType::Binary, true),
+        ])
+        .with_metadata(HashMap::from([(
+            SCHEMA_METADATA_KEY.into(),
+            r#"{
+    "type": "record",
+    "name": "topLevelRecord",
+    "fields": [
+        {
+            "name": "string_col",
+            "type": [
+                "null",
+                "string"
+            ],
+            "default": null
+        },
+        {
+            "name": "int_col",
+            "type": [
+                "null",
+                "int"
+            ],
+            "default": null
+        },
+        {
+            "name": "bool_col",
+            "type": [
+                "null",
+                "boolean"
+            ],
+            "default": null
+        },
+        {
+            "name": "bigint_col",
+            "type": [
+                "null",
+                "long"
+            ],
+            "default": null
+        },
+        {
+            "name": "float_col",
+            "type": [
+                "null",
+                "float"
+            ],
+            "default": null
+        },
+        {
+            "name": "double_col",
+            "type": [
+                "null",
+                "double"
+            ],
+            "default": null
+        },
+        {
+            "name": "bytes_col",
+            "type": [
+                "null",
+                "bytes"
+            ],
+            "default": null
+        }
+    ]
+}"#
+            .into(),
+        )]));
+
+        Arc::new(schema)
+    }
+
+    fn get_nested_records_schema() -> SchemaRef {
+        let schema = Schema::new(vec![
+            Field::new(
+                "f1",
+                DataType::Struct(
+                    vec![
+                        Field::new("f1_1", DataType::Utf8, false),
+                        Field::new("f1_2", DataType::Int32, false),
+                        Field::new(
+                            "f1_3",
+                            DataType::Struct(
+                                vec![Field::new("f1_3_1", DataType::Float64, 
false)].into(),
+                            ),
+                            false,
+                        ),
+                    ]
+                    .into(),
+                ),
+                false,
+            ),
+            Field::new(
+                "f2",
+                DataType::List(Arc::new(Field::new(
+                    "item",
+                    DataType::Struct(
+                        vec![
+                            Field::new("f2_1", DataType::Boolean, false),
+                            Field::new("f2_2", DataType::Float32, false),
+                        ]
+                        .into(),
+                    ),
+                    false,
+                ))),
+                false,
+            ),
+            Field::new(
+                "f3",
+                DataType::Struct(vec![Field::new("f3_1", DataType::Utf8, 
false)].into()),
+                true,
+            ),
+            Field::new(
+                "f4",
+                DataType::List(Arc::new(Field::new(
+                    "item",
+                    DataType::Struct(vec![Field::new("f4_1", DataType::Int64, 
false)].into()),
+                    true,
+                ))),
+                false,
+            ),
+        ])
+        .with_metadata(HashMap::from([(
+            SCHEMA_METADATA_KEY.into(),
+            r#"{
+    "type": "record",
+    "namespace": "ns1",
+    "name": "record1",
+    "fields": [
+        {
+            "name": "f1",
+            "type": {
+                "type": "record",
+                "namespace": "ns2",
+                "name": "record2",
+                "fields": [
+                    {
+                        "name": "f1_1",
+                        "type": "string"
+                    },
+                    {
+                        "name": "f1_2",
+                        "type": "int"
+                    },
+                    {
+                        "name": "f1_3",
+                        "type": {
+                            "type": "record",
+                            "namespace": "ns3",
+                            "name": "record3",
+                            "fields": [
+                                {
+                                    "name": "f1_3_1",
+                                    "type": "double"
+                                }
+                            ]
+                        }
+                    }
+                ]
+            }
+        },
+        {
+            "name": "f2",
+            "type": {
+                "type": "array",
+                "items": {
+                    "type": "record",
+                    "namespace": "ns4",
+                    "name": "record4",
+                    "fields": [
+                        {
+                            "name": "f2_1",
+                            "type": "boolean"
+                        },
+                        {
+                            "name": "f2_2",
+                            "type": "float"
+                        }
+                    ]
+                }
+            }
+        },
+        {
+            "name": "f3",
+            "type": [
+                "null",
+                {
+                    "type": "record",
+                    "namespace": "ns5",
+                    "name": "record5",
+                    "fields": [
+                        {
+                            "name": "f3_1",
+                            "type": "string"
+                        }
+                    ]
+                }
+            ],
+            "default": null
+        },
+        {
+            "name": "f4",
+            "type": {
+                "type": "array",
+                "items": [
+                    "null",
+                    {
+                        "type": "record",
+                        "namespace": "ns6",
+                        "name": "record6",
+                        "fields": [
+                            {
+                                "name": "f4_1",
+                                "type": "long"
+                            }
+                        ]
+                    }
+                ]
+            }
+        }
+    ]
+}
+"#
+            .into(),
+        )]));
+
+        Arc::new(schema)
+    }
+
+    async fn read_async_file(
+        path: &str,
+        batch_size: usize,
+        range: Option<Range<u64>>,
+        schema: Option<SchemaRef>,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Vec<RecordBatch>, ArrowError> {
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(path).unwrap();
+
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+        let mut builder = AsyncAvroFileReader::builder(file_reader, file_size, 
batch_size);
+
+        if let Some(s) = schema {
+            let reader_schema = AvroSchema::try_from(s.as_ref())?;
+            builder = builder.with_reader_schema(reader_schema);
+        }
+
+        if let Some(proj) = projection {
+            builder = builder.with_projection(proj);
+        }
+
+        if let Some(range) = range {
+            builder = builder.with_range(range);
+        }
+
+        let reader = builder.try_build().await?;
+        reader.try_collect().await
+    }
+
+    #[tokio::test]
+    async fn test_full_file_read() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, None, Some(schema), None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 11);
+
+        let id_array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(id_array.value(0), 4);
+        assert_eq!(id_array.value(7), 1);
+    }
+
+    #[tokio::test]
+    async fn test_small_batch_size() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 2, None, Some(schema), None)
+            .await
+            .unwrap();
+        assert_eq!(batches.len(), 4);
+
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 11);
+    }
+
+    #[tokio::test]
+    async fn test_batch_size_one() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1, None, Some(schema), None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batches.len(), 8);
+        assert_eq!(batch.num_rows(), 1);
+    }
+
+    #[tokio::test]
+    async fn test_batch_size_larger_than_file() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 10000, None, Some(schema), None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+    }
+
+    #[tokio::test]
+    async fn test_empty_range() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let range = 100..100;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), Some(schema), 
None)
+            .await
+            .unwrap();
+        assert_eq!(batches.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn test_range_starting_at_zero() {
+        // Tests that range starting at 0 correctly skips header
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let meta = store.head(&location).await.unwrap();
+
+        let range = 0..meta.size;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), Some(schema), 
None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+    }
+
+    #[tokio::test]
+    async fn test_range_after_header() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let meta = store.head(&location).await.unwrap();
+
+        let range = 100..meta.size;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), Some(schema), 
None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert!(batch.num_rows() > 0);
+    }
+
+    #[tokio::test]
+    async fn test_range_no_sync_marker() {
+        // Small range unlikely to contain sync marker
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let range = 50..150;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), Some(schema), 
None)
+            .await
+            .unwrap();
+        assert_eq!(batches.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn test_range_starting_mid_file() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+
+        let range = 700..768; // Header ends at 675, so this should be 
mid-block
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), Some(schema), 
None)
+            .await
+            .unwrap();
+        assert_eq!(batches.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn test_range_ending_at_file_size() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let meta = store.head(&location).await.unwrap();
+
+        let range = 200..meta.size;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), Some(schema), 
None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+    }
+
+    #[tokio::test]
+    async fn test_incomplete_block_requires_fetch() {
+        // Range ends mid-block, should trigger fetching_rem_block logic
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let range = 0..1200;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), Some(schema), 
None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8)
+    }
+
+    #[tokio::test]
+    async fn test_partial_vlq_header_requires_fetch() {
+        // Range ends mid-VLQ header, triggering the Count|Size partial fetch 
logic.
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let range = 16..676; // Header should end at 675
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), Some(schema), 
None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8)
+    }
+
+    #[cfg(feature = "snappy")]
+    #[tokio::test]
+    async fn test_snappy_compressed_with_range() {
+        {
+            let file = arrow_test_data("avro/alltypes_plain.snappy.avro");
+            let store = Arc::new(LocalFileSystem::new());
+            let location = Path::from_filesystem_path(&file).unwrap();
+            let meta = store.head(&location).await.unwrap();
+
+            let range = 200..meta.size;
+            let schema = get_alltypes_schema();
+            let batches = read_async_file(&file, 1024, Some(range), 
Some(schema), None)
+                .await
+                .unwrap();
+            let batch = &batches[0];
+
+            assert!(batch.num_rows() > 0);
+        }
+    }
+
+    #[tokio::test]
+    async fn test_nulls() {
+        let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
+        let schema = get_alltypes_with_nulls_schema();
+        let batches = read_async_file(&file, 1024, None, Some(schema), None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 1);
+        for col in batch.columns() {
+            assert!(col.is_null(0));
+        }
+    }
+
+    #[tokio::test]
+    async fn test_nested_records() {
+        let file = arrow_test_data("avro/nested_records.avro");
+        let schema = get_nested_records_schema();
+        let batches = read_async_file(&file, 1024, None, Some(schema), None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+        assert!(batch.num_columns() > 0);
+    }
+
+    #[tokio::test]
+    async fn test_stream_produces_multiple_batches() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+        let schema = get_alltypes_schema();
+        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+        let reader = AsyncAvroFileReader::builder(
+            file_reader,
+            file_size,
+            2, // Small batch size to force multiple batches
+        )
+        .with_reader_schema(reader_schema)
+        .try_build()
+        .await
+        .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+
+        assert!(batches.len() > 1);
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 8);
+    }
+
+    #[tokio::test]
+    async fn test_stream_early_termination() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+        let schema = get_alltypes_schema();
+        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1)
+            .with_reader_schema(reader_schema)
+            .try_build()
+            .await
+            .unwrap();
+
+        let first_batch = 
reader.take(1).try_collect::<Vec<_>>().await.unwrap();
+
+        assert_eq!(first_batch.len(), 1);
+        assert!(first_batch[0].num_rows() > 0);
+    }
+
+    #[tokio::test]
+    async fn test_various_batch_sizes() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+
+        for batch_size in [1, 2, 3, 5, 7, 11, 100] {
+            let schema = get_alltypes_schema();
+            let batches = read_async_file(&file, batch_size, None, 
Some(schema), None)
+                .await
+                .unwrap();
+            let batch = &batches[0];
+
+            // Size should be what was provided, to the limit of the batch in 
the file
+            assert_eq!(
+                batch.num_rows(),
+                batch_size.min(8),
+                "Failed with batch_size={}",
+                batch_size
+            );
+        }
+    }
+
+    #[tokio::test]
+    async fn test_range_larger_than_file() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let meta = store.head(&location).await.unwrap();
+
+        // Range extends beyond file size
+        let range = 100..(meta.size + 1000);
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), Some(schema), 
None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        // Should clamp to file size
+        assert_eq!(batch.num_rows(), 8);
+    }
+
+    #[tokio::test]
+    async fn test_roundtrip_write_then_async_read() {
+        use crate::writer::AvroWriter;
+        use arrow_array::{Float64Array, StringArray};
+        use std::fs::File;
+        use std::io::BufWriter;
+        use tempfile::tempdir;
+
+        // Schema with nullable and non-nullable fields of various types
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+            Field::new("score", DataType::Float64, true),
+            Field::new("count", DataType::Int64, false),
+        ]));
+
+        let dir = tempdir().unwrap();
+        let file_path = dir.path().join("roundtrip_test.avro");
+
+        // Write multiple batches with nulls
+        {
+            let file = File::create(&file_path).unwrap();
+            let writer = BufWriter::new(file);
+            let mut avro_writer = AvroWriter::new(writer, 
schema.as_ref().clone()).unwrap();
+
+            // First batch: 3 rows with some nulls
+            let batch1 = RecordBatch::try_new(
+                schema.clone(),
+                vec![
+                    Arc::new(Int32Array::from(vec![1, 2, 3])),
+                    Arc::new(StringArray::from(vec![
+                        Some("alice"),
+                        None,
+                        Some("charlie"),
+                    ])),
+                    Arc::new(Float64Array::from(vec![Some(95.5), Some(87.3), 
None])),
+                    Arc::new(Int64Array::from(vec![10, 20, 30])),
+                ],
+            )
+            .unwrap();
+            avro_writer.write(&batch1).unwrap();
+
+            // Second batch: 2 rows
+            let batch2 = RecordBatch::try_new(
+                schema.clone(),
+                vec![
+                    Arc::new(Int32Array::from(vec![4, 5])),
+                    Arc::new(StringArray::from(vec![Some("diana"), 
Some("eve")])),
+                    Arc::new(Float64Array::from(vec![None, Some(88.0)])),
+                    Arc::new(Int64Array::from(vec![40, 50])),
+                ],
+            )
+            .unwrap();
+            avro_writer.write(&batch2).unwrap();
+
+            avro_writer.finish().unwrap();
+        }
+
+        // Read back with small batch size to produce multiple output batches
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file_path).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
+            .try_build()
+            .await
+            .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+
+        // Verify we got multiple output batches due to small batch_size
+        assert!(
+            batches.len() > 1,
+            "Expected multiple batches with batch_size=2"
+        );
+
+        // Verify total row count
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 5);
+
+        // Concatenate all batches to verify data
+        let combined = arrow::compute::concat_batches(&batches[0].schema(), 
&batches).unwrap();
+        assert_eq!(combined.num_rows(), 5);
+        assert_eq!(combined.num_columns(), 4);
+
+        // Check id column (non-nullable)
+        let id_array = combined
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(id_array.values(), &[1, 2, 3, 4, 5]);
+
+        // Check name column (nullable) - verify nulls are preserved
+        // Avro strings are read as Binary by default
+        let name_col = combined.column(1);
+        let name_array = name_col.as_string::<i32>();
+        assert_eq!(name_array.value(0), "alice");
+        assert!(name_col.is_null(1)); // second row has null name
+        assert_eq!(name_array.value(2), "charlie");
+
+        // Check score column (nullable) - verify nulls are preserved
+        let score_array = combined
+            .column(2)
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap();
+        assert!(!score_array.is_null(0));
+        assert!((score_array.value(0) - 95.5).abs() < f64::EPSILON);
+        assert!(score_array.is_null(2)); // third row has null score
+        assert!(score_array.is_null(3)); // fourth row has null score
+        assert!(!score_array.is_null(4));
+        assert!((score_array.value(4) - 88.0).abs() < f64::EPSILON);
+
+        // Check count column (non-nullable)
+        let count_array = combined
+            .column(3)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(count_array.values(), &[10, 20, 30, 40, 50]);
+    }
+
+    #[tokio::test]
+    async fn test_alltypes_no_schema_no_projection() {
+        // No reader schema, no projection - uses writer schema from file
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let batches = read_async_file(&file, 1024, None, None, None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 11);
+        assert_eq!(batch.schema().field(0).name(), "id");
+    }
+
+    #[tokio::test]
+    async fn test_alltypes_no_schema_with_projection() {
+        // No reader schema, with projection - project writer schema
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        // Project [tinyint_col, id, bigint_col] = indices [2, 0, 5]
+        let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0, 
5]))
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 3);
+        assert_eq!(batch.schema().field(0).name(), "tinyint_col");
+        assert_eq!(batch.schema().field(1).name(), "id");
+        assert_eq!(batch.schema().field(2).name(), "bigint_col");
+
+        // Verify data values
+        let tinyint_col = batch.column(0).as_primitive::<Int32Type>();
+        assert_eq!(tinyint_col.values(), &[0, 1, 0, 1, 0, 1, 0, 1]);
+
+        let id = batch.column(1).as_primitive::<Int32Type>();
+        assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
+
+        let bigint_col = batch.column(2).as_primitive::<Int64Type>();
+        assert_eq!(bigint_col.values(), &[0, 10, 0, 10, 0, 10, 0, 10]);
+    }
+
+    #[tokio::test]
+    async fn test_alltypes_with_schema_no_projection() {
+        // With reader schema, no projection
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, None, Some(schema), None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 11);
+    }
+
+    #[tokio::test]
+    async fn test_alltypes_with_schema_with_projection() {
+        // With reader schema, with projection
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        // Project [bool_col, id] = indices [1, 0]
+        let batches = read_async_file(&file, 1024, None, Some(schema), 
Some(vec![1, 0]))
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.schema().field(0).name(), "bool_col");
+        assert_eq!(batch.schema().field(1).name(), "id");
+
+        let bool_col = batch.column(0).as_boolean();
+        assert!(bool_col.value(0));
+        assert!(!bool_col.value(1));
+
+        let id = batch.column(1).as_primitive::<Int32Type>();
+        assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
+    }
+
+    #[tokio::test]
+    async fn test_nested_no_schema_no_projection() {
+        // No reader schema, no projection
+        let file = arrow_test_data("avro/nested_records.avro");
+        let batches = read_async_file(&file, 1024, None, None, None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 4);
+        assert_eq!(batch.schema().field(0).name(), "f1");
+        assert_eq!(batch.schema().field(1).name(), "f2");
+        assert_eq!(batch.schema().field(2).name(), "f3");
+        assert_eq!(batch.schema().field(3).name(), "f4");
+    }
+
+    #[tokio::test]
+    async fn test_nested_no_schema_with_projection() {
+        // No reader schema, with projection - reorder nested fields
+        let file = arrow_test_data("avro/nested_records.avro");
+        // Project [f3, f1] = indices [2, 0]
+        let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 
0]))
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.schema().field(0).name(), "f3");
+        assert_eq!(batch.schema().field(1).name(), "f1");
+    }
+
+    #[tokio::test]
+    async fn test_nested_with_schema_no_projection() {
+        // With reader schema, no projection
+        let file = arrow_test_data("avro/nested_records.avro");
+        let schema = get_nested_records_schema();
+        let batches = read_async_file(&file, 1024, None, Some(schema), None)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 4);
+    }
+
+    #[tokio::test]
+    async fn test_nested_with_schema_with_projection() {
+        // With reader schema, with projection
+        let file = arrow_test_data("avro/nested_records.avro");
+        let schema = get_nested_records_schema();
+        // Project [f4, f2, f1] = indices [3, 1, 0]
+        let batches = read_async_file(&file, 1024, None, Some(schema), 
Some(vec![3, 1, 0]))
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 3);
+        assert_eq!(batch.schema().field(0).name(), "f4");
+        assert_eq!(batch.schema().field(1).name(), "f2");
+        assert_eq!(batch.schema().field(2).name(), "f1");
+    }
+
+    #[tokio::test]
+    async fn test_projection_error_out_of_bounds() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        // Index 100 is out of bounds for the 11-field schema
+        let err = read_async_file(&file, 1024, None, None, Some(vec![100]))
+            .await
+            .unwrap_err();
+        assert!(matches!(err, ArrowError::AvroError(_)));
+        assert!(err.to_string().contains("out of bounds"));
+    }
+
+    #[tokio::test]
+    async fn test_projection_error_duplicate_index() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        // Duplicate index 0
+        let err = read_async_file(&file, 1024, None, None, Some(vec![0, 0]))
+            .await
+            .unwrap_err();
+        assert!(matches!(err, ArrowError::AvroError(_)));
+        assert!(err.to_string().contains("Duplicate projection index"));
+    }
+
+    #[tokio::test]
+    async fn test_with_header_size_hint_small() {
+        // Use a very small header size hint to force multiple fetches
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+        let schema = get_alltypes_schema();
+        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+
+        // Use a tiny header hint (64 bytes) - header is much larger
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .with_reader_schema(reader_schema)
+            .with_header_size_hint(64)
+            .try_build()
+            .await
+            .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 11);
+    }
+
+    #[tokio::test]
+    async fn test_with_header_size_hint_large() {
+        // Use a larger header size hint than needed
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+        let schema = get_alltypes_schema();
+        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+
+        // Use a large header hint (64KB)
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .with_reader_schema(reader_schema)
+            .with_header_size_hint(64 * 1024)
+            .try_build()
+            .await
+            .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 11);
+    }
+
+    #[tokio::test]
+    async fn test_with_utf8_view_enabled() {
+        // Test that utf8_view produces StringViewArray instead of StringArray
+        let file = arrow_test_data("avro/nested_records.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .with_utf8_view(true)
+            .try_build()
+            .await
+            .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+
+        // The f1 struct contains f1_1 which is a string field
+        // With utf8_view enabled, it should be Utf8View type
+        let f1_col = batch.column(0);
+        let f1_struct = f1_col.as_struct();
+        let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
+
+        // Check that the data type is Utf8View
+        assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
+    }
+
+    #[tokio::test]
+    async fn test_with_utf8_view_disabled() {
+        // Test that without utf8_view, we get regular Utf8
+        let file = arrow_test_data("avro/nested_records.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .with_utf8_view(false)
+            .try_build()
+            .await
+            .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+
+        // The f1 struct contains f1_1 which is a string field
+        // Without utf8_view, it should be regular Utf8
+        let f1_col = batch.column(0);
+        let f1_struct = f1_col.as_struct();
+        let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
+
+        assert_eq!(f1_1_field.data_type(), &DataType::Utf8);
+    }
+
+    #[tokio::test]
+    async fn test_with_strict_mode_disabled_allows_null_second() {
+        // Test that with strict_mode disabled, unions of ['T', 'null'] are 
allowed
+        // The alltypes_nulls_plain.avro file has unions with null second
+        let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+
+        // Without strict mode, this should succeed
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .with_strict_mode(false)
+            .try_build()
+            .await
+            .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 1);
+    }
+
+    #[tokio::test]
+    async fn test_with_strict_mode_enabled_rejects_null_second() {
+        // Test that with strict_mode enabled, unions of ['T', 'null'] are 
rejected
+        // The alltypes_plain.avro file has unions like ["int", "null"] (null 
second)
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+
+        // With strict mode, this should fail because of ['T', 'null'] unions
+        let result = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .with_strict_mode(true)
+            .try_build()
+            .await;
+
+        match result {
+            Ok(_) => panic!("Expected error for strict_mode with ['T', 'null'] 
union"),
+            Err(err) => {
+                assert!(
+                    err.to_string().contains("disallowed in strict_mode"),
+                    "Expected strict_mode error, got: {}",
+                    err
+                );
+            }
+        }
+    }
+
+    #[tokio::test]
+    async fn test_with_strict_mode_enabled_valid_schema() {
+        // Test that strict_mode works with schemas that have proper ['null', 
'T'] unions
+        // The nested_records.avro file has properly ordered unions
+        let file = arrow_test_data("avro/nested_records.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+
+        // With strict mode, properly ordered unions should still work
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .with_strict_mode(true)
+            .try_build()
+            .await
+            .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 2);
+    }
+
+    #[tokio::test]
+    async fn test_builder_options_combined() {
+        // Test combining multiple builder options
+        let file = arrow_test_data("avro/nested_records.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
+            .with_header_size_hint(128)
+            .with_utf8_view(true)
+            .with_strict_mode(true)
+            .with_projection(vec![0, 2]) // f1 and f3
+            .try_build()
+            .await
+            .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        let batch = &batches[0];
+
+        // Should have 2 columns (f1 and f3) due to projection
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.schema().field(0).name(), "f1");
+        assert_eq!(batch.schema().field(1).name(), "f3");
+
+        // Verify utf8_view is applied
+        let f1_col = batch.column(0);
+        let f1_struct = f1_col.as_struct();
+        let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
+        assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
+    }
+}
diff --git a/arrow-avro/src/reader/async_reader/store.rs 
b/arrow-avro/src/reader/async_reader/store.rs
new file mode 100644
index 0000000000..44a4abf1a2
--- /dev/null
+++ b/arrow-avro/src/reader/async_reader/store.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::AvroError;
+use crate::reader::async_reader::AsyncFileReader;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, TryFutureExt};
+use object_store::ObjectStore;
+use object_store::ObjectStoreExt;
+use object_store::path::Path;
+use std::error::Error;
+use std::ops::Range;
+use std::sync::Arc;
+use tokio::runtime::Handle;
+
+/// An implementation of an AsyncFileReader using the [`ObjectStore`] API.
+pub struct AvroObjectReader {
+    store: Arc<dyn ObjectStore>,
+    path: Path,
+    runtime: Option<Handle>,
+}
+
+impl AvroObjectReader {
+    /// Creates a new [`Self`] from a store implementation and file location.
+    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+        Self {
+            store,
+            path,
+            runtime: None,
+        }
+    }
+
+    /// Perform IO on the provided tokio runtime
+    ///
+    /// Tokio is a cooperative scheduler, and relies on tasks yielding in a 
timely manner
+    /// to service IO. Therefore, running IO and CPU-bound tasks, such as avro 
decoding,
+    /// on the same tokio runtime can lead to degraded throughput, dropped 
connections and
+    /// other issues. For more information see [here].
+    ///
+    /// [here]: 
https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+    pub fn with_runtime(self, handle: Handle) -> Self {
+        Self {
+            runtime: Some(handle),
+            ..self
+        }
+    }
+
+    fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O, AvroError>>
+    where
+        F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, 
Result<O, E>>
+            + Send
+            + 'static,
+        O: Send + 'static,
+        E: Error + Send + 'static,
+    {
+        match &self.runtime {
+            Some(handle) => {
+                let path = self.path.clone();
+                let store = Arc::clone(&self.store);
+                handle
+                    .spawn(async move { f(&store, &path).await })
+                    .map_ok_or_else(
+                        |e| match e.try_into_panic() {
+                            Err(e) => Err(AvroError::External(Box::new(e))),
+                            Ok(p) => std::panic::resume_unwind(p),
+                        },
+                        |res| res.map_err(|e| 
AvroError::General(e.to_string())),
+                    )
+                    .boxed()
+            }
+            None => f(&self.store, &self.path)
+                .map_err(|e| AvroError::General(e.to_string()))
+                .boxed(),
+        }
+    }
+}
+
+impl AsyncFileReader for AvroObjectReader {
+    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, 
AvroError>> {
+        self.spawn(|store, path| async move { store.get_range(path, 
range).await }.boxed())
+    }
+
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+    ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>>
+    where
+        Self: Send,
+    {
+        self.spawn(|store, path| async move { store.get_ranges(path, 
&ranges).await }.boxed())
+    }
+}
diff --git a/arrow-avro/src/reader/block.rs b/arrow-avro/src/reader/block.rs
index 540aedee52..e20818bf96 100644
--- a/arrow-avro/src/reader/block.rs
+++ b/arrow-avro/src/reader/block.rs
@@ -43,7 +43,7 @@ pub struct BlockDecoder {
 }
 
 #[derive(Debug)]
-enum BlockDecoderState {
+pub(crate) enum BlockDecoderState {
     Count,
     Size,
     Data,
@@ -137,3 +137,14 @@ impl BlockDecoder {
         }
     }
 }
+
+#[cfg(feature = "async")]
+impl BlockDecoder {
+    pub(crate) fn state(&self) -> &BlockDecoderState {
+        &self.state
+    }
+
+    pub(crate) fn bytes_remaining(&self) -> usize {
+        self.bytes_remaining
+    }
+}
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 55763aeb62..a60bc26b49 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -499,6 +499,14 @@ mod header;
 mod record;
 mod vlq;
 
+#[cfg(feature = "async")]
+mod async_reader;
+
+#[cfg(feature = "object_store")]
+pub use async_reader::AvroObjectReader;
+#[cfg(feature = "async")]
+pub use async_reader::{AsyncAvroFileReader, AsyncFileReader};
+
 fn is_incomplete_data(err: &AvroError) -> bool {
     matches!(
         err,
@@ -641,6 +649,25 @@ pub struct Decoder {
 }
 
 impl Decoder {
+    pub(crate) fn from_parts(
+        batch_size: usize,
+        active_decoder: RecordDecoder,
+        active_fingerprint: Option<Fingerprint>,
+        cache: IndexMap<Fingerprint, RecordDecoder>,
+        fingerprint_algorithm: FingerprintAlgorithm,
+    ) -> Self {
+        Self {
+            batch_size,
+            remaining_capacity: batch_size,
+            active_fingerprint,
+            active_decoder,
+            cache,
+            fingerprint_algorithm,
+            pending_schema: None,
+            awaiting_body: false,
+        }
+    }
+
     /// Returns the Arrow schema for the rows decoded by this decoder.
     ///
     /// **Note:** With single‑object or Confluent framing, the schema may 
change
@@ -999,25 +1026,6 @@ impl ReaderBuilder {
         self.make_record_decoder(writer_schema, reader_schema_raw.as_ref())
     }
 
-    fn make_decoder_with_parts(
-        &self,
-        active_decoder: RecordDecoder,
-        active_fingerprint: Option<Fingerprint>,
-        cache: IndexMap<Fingerprint, RecordDecoder>,
-        fingerprint_algorithm: FingerprintAlgorithm,
-    ) -> Decoder {
-        Decoder {
-            batch_size: self.batch_size,
-            remaining_capacity: self.batch_size,
-            active_fingerprint,
-            active_decoder,
-            cache,
-            fingerprint_algorithm,
-            pending_schema: None,
-            awaiting_body: false,
-        }
-    }
-
     fn make_decoder(
         &self,
         header: Option<&Header>,
@@ -1054,7 +1062,8 @@ impl ReaderBuilder {
             let effective_reader_schema = 
projected_reader_schema.as_ref().or(reader_schema);
             let record_decoder =
                 self.make_record_decoder_from_schemas(&writer_schema, 
effective_reader_schema)?;
-            return Ok(self.make_decoder_with_parts(
+            return Ok(Decoder::from_parts(
+                self.batch_size,
                 record_decoder,
                 None,
                 IndexMap::new(),
@@ -1121,7 +1130,8 @@ impl ReaderBuilder {
                 "Initial fingerprint {start_fingerprint:?} not found in schema 
store"
             ))
         })?;
-        Ok(self.make_decoder_with_parts(
+        Ok(Decoder::from_parts(
+            self.batch_size,
             active_decoder,
             Some(start_fingerprint),
             cache,

Reply via email to