This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c1033d1a68 feat: Implement an AsyncReader for avro using ObjectStore
(#8930)
c1033d1a68 is described below
commit c1033d1a68c55de74429d0f67120dad35be53e2a
Author: Emily Matheys <[email protected]>
AuthorDate: Mon Feb 2 21:07:56 2026 +0200
feat: Implement an AsyncReader for avro using ObjectStore (#8930)
# Which issue does this PR close?
- Closes #8929 .
# Rationale for this change
Allows for proper file splitting within an asynchronous context.
# What changes are included in this PR?
The raw implementation, allowing for file splitting, starting
mid-block(read until sync marker is found), and further reading until
end of block is found.
This reader currently requires a reader_schema is provided if
type-promotion, schema-evolution, or projection are desired.
This is done so because https://github.com/apache/arrow-rs/issues/8928
is currently blocking proper parsing from an ArrowSchema
# Are these changes tested?
Yes
# Are there any user-facing changes?
Only the addition.
Other changes are internal to the crate (namely the way Decoder is
created from parts)
---
arrow-avro/Cargo.toml | 15 +-
arrow-avro/README.md | 60 +-
arrow-avro/src/lib.rs | 47 +
.../src/reader/async_reader/async_file_reader.rs | 95 ++
arrow-avro/src/reader/async_reader/builder.rs | 256 +++
arrow-avro/src/reader/async_reader/mod.rs | 1764 ++++++++++++++++++++
arrow-avro/src/reader/async_reader/store.rs | 107 ++
arrow-avro/src/reader/block.rs | 13 +-
arrow-avro/src/reader/mod.rs | 52 +-
9 files changed, 2377 insertions(+), 32 deletions(-)
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index a4699a2efa..5e5efd78d0 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -45,16 +45,26 @@ sha256 = ["dep:sha2"]
small_decimals = []
avro_custom_types = ["dep:arrow-select"]
+# Enable async APIs
+async = ["futures", "tokio"]
+# Enable object_store integration
+object_store = ["dep:object_store", "async"]
+
[dependencies]
arrow-schema = { workspace = true }
arrow-buffer = { workspace = true }
arrow-array = { workspace = true }
arrow-select = { workspace = true, optional = true }
+
+object_store = { version = "0.13", default-features = false, optional = true }
+
+bytes = { version = "1.11.0", default-features = false, features = ["std"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde = { version = "1.0.188", features = ["derive"] }
flate2 = { version = "1.0", default-features = false, features = [
"rust_backend",
], optional = true }
+futures = { version = "0.3", default-features = false, features = ["std"],
optional = true }
snap = { version = "1.0", default-features = false, optional = true }
zstd = { version = "0.13", default-features = false, optional = true }
bzip2 = { version = "0.6.0", optional = true }
@@ -66,10 +76,11 @@ indexmap = "2.10"
rand = "0.9"
md5 = { version = "0.8", optional = true }
sha2 = { version = "0.10", optional = true }
-bytes = "1.11"
+tokio = { version = "1.0", optional = true, default-features = false, features
= ["macros", "rt", "io-util"] }
[dev-dependencies]
arrow-data = { workspace = true }
+bytes = { version = "1.11.0", default-features = false, features = ["std"] }
rand = { version = "0.9.1", default-features = false, features = [
"std",
"std_rng",
@@ -82,7 +93,9 @@ futures = "0.3.31"
async-stream = "0.3.6"
apache-avro = "0.21.0"
num-bigint = "0.4"
+object_store = { version = "0.13", default-features = false, features = ["fs"]
}
once_cell = "1.21.3"
+tokio = { version = "1.0", default-features = false, features = ["macros",
"rt-multi-thread", "io-util", "fs"] }
[[bench]]
name = "avro_reader"
diff --git a/arrow-avro/README.md b/arrow-avro/README.md
index 85fd760947..c5776c125b 100644
--- a/arrow-avro/README.md
+++ b/arrow-avro/README.md
@@ -51,7 +51,7 @@ Disable defaults and pick only what you need (see **Feature
Flags**):
```toml
[dependencies]
-arrow-avro = { version = "57.0.0", default-features = false, features =
["deflate", "snappy"] }
+arrow-avro = { version = "58.0.0", default-features = false, features =
["deflate", "snappy"] }
```
---
@@ -105,6 +105,36 @@ fn main() -> anyhow::Result<()> {
See the crate docs for runnable SOE and Confluent round‑trip examples.
+### Async reading from object stores (`object_store` feature)
+
+```rust,ignore
+use std::sync::Arc;
+use arrow_avro::reader::{AsyncAvroFileReader, AvroObjectReader};
+use futures::TryStreamExt;
+use object_store::ObjectStore;
+use object_store::local::LocalFileSystem;
+use object_store::path::Path;
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let path = Path::from("data/example.avro");
+
+ let meta = store.head(&path).await?;
+ let reader = AvroObjectReader::new(store, path);
+
+ let stream = AsyncAvroFileReader::builder(reader, meta.size, 1024)
+ .try_build()
+ .await?;
+
+ let batches: Vec<_> = stream.try_collect().await?;
+ for batch in batches {
+ println!("rows: {}", batch.num_rows());
+ }
+ Ok(())
+}
+```
+
---
## Feature Flags (what they do and when to use them)
@@ -128,15 +158,22 @@ See the crate docs for runnable SOE and Confluent
round‑trip examples.
* Only **OCF** uses these codecs (they compress per‑block). They do **not**
apply to raw Avro frames used by Confluent wire format or SOE. The crate’s
`compression` module is specifically for **OCF blocks**.
* `deflate` uses `flate2` with the `rust_backend` (no system zlib required).
+### Async & Object Store
+
+| Feature | Default | What it enables
| When to use
|
+|----------------|--------:|-----------------------------------------------------------------------------|-------------------------------------------------------------------------------|
+| `async` | ⬜ | Async APIs for reading Avro via `futures` and
`tokio` | Enable for non-blocking async Avro reading with
`AsyncAvroFileReader`. |
+| `object_store` | ⬜ | Integration with `object_store` crate (implies
`async`) | Enable for reading Avro from cloud storage (S3,
GCS, Azure Blob, etc.). |
+
### Schema fingerprints & custom logical type helpers
-| Feature | Default | What it enables
| When to use
|
+| Feature | Default | What it enables
| When to use
|
|-----------------------------|--------:|----------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|
-| `md5` | ⬜ | `md5` dep for optional **MD5**
schema fingerprints | If you want to compute MD5
fingerprints of writer schemas (i.e. for custom prefixing/validation).
|
-| `sha256` | ⬜ | `sha2` dep for optional **SHA‑256**
schema fingerprints | If you prefer longer
fingerprints; affects max prefix length (i.e. when framing).
|
+| `md5` | ⬜ | `md5` dep for optional **MD5**
schema fingerprints | If you want to compute MD5
fingerprints of writer schemas (i.e. for custom prefixing/validation).
|
+| `sha256` | ⬜ | `sha2` dep for optional **SHA‑256**
schema fingerprints | If you prefer longer
fingerprints; affects max prefix length (i.e. when framing).
|
| `small_decimals` | ⬜ | Extra handling for **small decimal**
logical types (`Decimal32` and `Decimal64`) | If your Avro `decimal` values are
small and you want more compact Arrow representations.
|
-| `avro_custom_types` | ⬜ | Annotates Avro values using Arrow
specific custom logical types | Enable when you need
arrow-avro to reinterpret certain Avro fields as Arrow types that Avro doesn’t
natively model. |
-| `canonical_extension_types` | ⬜ | Re‑exports Arrow’s canonical
extension types support from `arrow-schema` | Enable if your workflow
uses Arrow [canonical extension types] and you want `arrow-avro` to respect
them. |
+| `avro_custom_types` | ⬜ | Annotates Avro values using Arrow
specific custom logical types | Enable when you need
arrow-avro to reinterpret certain Avro fields as Arrow types that Avro doesn't
natively model. |
+| `canonical_extension_types` | ⬜ | Re‑exports Arrow's canonical
extension types support from `arrow-schema` | Enable if your workflow
uses Arrow [canonical extension types] and you want `arrow-avro` to respect
them. |
[canonical extension types]:
https://arrow.apache.org/docs/format/CanonicalExtensions.html
@@ -149,17 +186,22 @@ See the crate docs for runnable SOE and Confluent
round‑trip examples.
* Minimal, fast build (common pipelines):
```toml
- arrow-avro = { version = "56", default-features = false, features =
["deflate", "snappy"] }
+ arrow-avro = { version = "58", default-features = false, features =
["deflate", "snappy"] }
```
* Include Zstandard too (modern data lakes):
```toml
- arrow-avro = { version = "56", default-features = false, features =
["deflate", "snappy", "zstd"] }
+ arrow-avro = { version = "58", default-features = false, features =
["deflate", "snappy", "zstd"] }
+ ```
+* Async reading from object stores (S3, GCS, etc.):
+
+ ```toml
+ arrow-avro = { version = "58", features = ["object_store"] }
```
* Fingerprint helpers:
```toml
- arrow-avro = { version = "56", features = ["md5", "sha256"] }
+ arrow-avro = { version = "58", features = ["md5", "sha256"] }
```
---
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
index eb04ee8fa6..da451ea146 100644
--- a/arrow-avro/src/lib.rs
+++ b/arrow-avro/src/lib.rs
@@ -123,16 +123,56 @@
//! # Ok(()) }
//! ```
//!
+//! ## `async` Reading (`async` feature)
+//!
+//! The [`reader`] module provides async APIs for reading Avro files when the
`async`
+//! feature is enabled.
+//!
+//! [`AsyncAvroFileReader`] implements `Stream<Item = Result<RecordBatch,
ArrowError>>`,
+//! allowing efficient async streaming of record batches. When the
`object_store` feature
+//! is enabled, [`AvroObjectReader`] provides integration with object storage
services
+//! such as S3 via the [object_store] crate.
+//!
+//! ```ignore
+//! use std::sync::Arc;
+//! use arrow_avro::reader::{AsyncAvroFileReader, AvroObjectReader};
+//! use futures::TryStreamExt;
+//! use object_store::ObjectStore;
+//! use object_store::local::LocalFileSystem;
+//! use object_store::path::Path;
+//!
+//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
+//! let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+//! let path = Path::from("data/example.avro");
+//! let meta = store.head(&path).await?;
+//!
+//! let reader = AvroObjectReader::new(store, path);
+//! let stream = AsyncAvroFileReader::builder(reader, meta.size, 1024)
+//! .try_build()
+//! .await?;
+//!
+//! let batches: Vec<_> = stream.try_collect().await?;
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! [object_store]: https://docs.rs/object_store/latest/object_store/
+//!
//! ---
//!
//! ### Modules
//!
//! - [`reader`]: read Avro (OCF, SOE, Confluent) into Arrow `RecordBatch`es.
+//! - With the `async` feature: [`AsyncAvroFileReader`] for async streaming
reads.
+//! - With the `object_store` feature: [`AvroObjectReader`] for reading from
cloud storage.
//! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent,
Apicurio).
//! - [`schema`]: Avro schema parsing / fingerprints / registries.
//! - [`compression`]: codecs used for **OCF block compression** (i.e.,
Deflate, Snappy, Zstandard, BZip2, and XZ).
//! - [`codec`]: internal Avro-Arrow type conversion and row decode/encode
plans.
//!
+//! [`AsyncAvroFileReader`]: reader::AsyncAvroFileReader
+//! [`AvroObjectReader`]: reader::AvroObjectReader
+//!
//! ### Features
//!
//! **OCF compression (enabled by default)**
@@ -142,6 +182,11 @@
//! - `bzip2` — enable BZip2 block compression.
//! - `xz` — enable XZ/LZMA block compression.
//!
+//! **Async & Object Store (opt‑in)**
+//! - `async` — enable async APIs for reading Avro (`AsyncAvroFileReader`,
`AsyncFileReader` trait).
+//! - `object_store` — enable integration with the [`object_store`] crate for
reading Avro
+//! from cloud storage (S3, GCS, Azure Blob, etc.) via `AvroObjectReader`.
Implies `async`.
+//!
//! **Schema fingerprints & helpers (opt‑in)**
//! - `md5` — enable MD5 writer‑schema fingerprints.
//! - `sha256` — enable SHA‑256 writer‑schema fingerprints.
@@ -156,6 +201,8 @@
//! - OCF compression codecs apply only to **Object Container Files**; they do
not affect Avro
//! single object encodings.
//!
+//! [`object_store`]: https://docs.rs/object_store/latest/object_store/
+//!
//! [canonical extension types]:
https://arrow.apache.org/docs/format/CanonicalExtensions.html
//!
//! [Apache Arrow]: https://arrow.apache.org/
diff --git a/arrow-avro/src/reader/async_reader/async_file_reader.rs
b/arrow-avro/src/reader/async_reader/async_file_reader.rs
new file mode 100644
index 0000000000..1257a2f3dd
--- /dev/null
+++ b/arrow-avro/src/reader/async_reader/async_file_reader.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::AvroError;
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::future::BoxFuture;
+use std::io::SeekFrom;
+use std::ops::Range;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+/// The asynchronous interface used by [`super::AsyncAvroFileReader`] to read
avro files
+///
+/// Notes:
+///
+/// 1. There is a default implementation for types that implement [`AsyncRead`]
+/// and [`AsyncSeek`], for example [`tokio::fs::File`].
+///
+/// 2. [`super::AvroObjectReader`], available when the `object_store` crate
feature
+/// is enabled, implements this interface for [`ObjectStore`].
+///
+/// [`ObjectStore`]: object_store::ObjectStore
+///
+/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
+pub trait AsyncFileReader: Send {
+ /// Retrieve the bytes in `range`
+ fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes,
AvroError>>;
+
+ /// Retrieve multiple byte ranges. The default implementation will call
`get_bytes` sequentially
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<u64>>,
+ ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
+ async move {
+ let mut result = Vec::with_capacity(ranges.len());
+
+ for range in ranges.into_iter() {
+ let data = self.get_bytes(range).await?;
+ result.push(data);
+ }
+
+ Ok(result)
+ }
+ .boxed()
+ }
+}
+
+/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
+impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
+ fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes,
AvroError>> {
+ self.as_mut().get_bytes(range)
+ }
+
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<u64>>,
+ ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
+ self.as_mut().get_byte_ranges(ranges)
+ }
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
+ fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes,
AvroError>> {
+ async move {
+ self.seek(SeekFrom::Start(range.start)).await?;
+
+ let to_read = range.end - range.start;
+ let mut buffer = Vec::with_capacity(to_read as usize);
+ let read = self.take(to_read).read_to_end(&mut buffer).await?;
+ if read as u64 != to_read {
+ return Err(AvroError::EOF(format!(
+ "expected to read {} bytes, got {}",
+ to_read, read
+ )));
+ }
+
+ Ok(buffer.into())
+ }
+ .boxed()
+ }
+}
diff --git a/arrow-avro/src/reader/async_reader/builder.rs
b/arrow-avro/src/reader/async_reader/builder.rs
new file mode 100644
index 0000000000..0f9a7abf1c
--- /dev/null
+++ b/arrow-avro/src/reader/async_reader/builder.rs
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::codec::AvroFieldBuilder;
+use crate::errors::AvroError;
+use crate::reader::async_reader::ReaderState;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
+use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY};
+use indexmap::IndexMap;
+use std::ops::Range;
+
+const DEFAULT_HEADER_SIZE_HINT: u64 = 16 * 1024; // 16 KB
+
+/// Builder for an asynchronous Avro file reader.
+pub struct ReaderBuilder<R> {
+ reader: R,
+ file_size: u64,
+ batch_size: usize,
+ range: Option<Range<u64>>,
+ reader_schema: Option<AvroSchema>,
+ projection: Option<Vec<usize>>,
+ header_size_hint: Option<u64>,
+ utf8_view: bool,
+ strict_mode: bool,
+}
+
+impl<R> ReaderBuilder<R> {
+ pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
+ Self {
+ reader,
+ file_size,
+ batch_size,
+ range: None,
+ reader_schema: None,
+ projection: None,
+ header_size_hint: None,
+ utf8_view: false,
+ strict_mode: false,
+ }
+ }
+
+ /// Specify a byte range to read from the Avro file.
+ /// If this is provided, the reader will read all the blocks within the
specified range,
+ /// if the range ends mid-block, it will attempt to fetch the remaining
bytes to complete the block,
+ /// but no further blocks will be read.
+ /// If this is omitted, the full file will be read.
+ pub fn with_range(self, range: Range<u64>) -> Self {
+ Self {
+ range: Some(range),
+ ..self
+ }
+ }
+
+ /// Specify a reader schema to use when reading the Avro file.
+ /// This can be useful to project specific columns or handle schema
evolution.
+ /// If this is not provided, the schema will be derived from the Arrow
schema provided.
+ pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
+ Self {
+ reader_schema: Some(reader_schema),
+ ..self
+ }
+ }
+
+ /// Specify a projection of column indices to read from the Avro file.
+ /// This can help optimize reading by only fetching the necessary columns.
+ pub fn with_projection(self, projection: Vec<usize>) -> Self {
+ Self {
+ projection: Some(projection),
+ ..self
+ }
+ }
+
+ /// Provide a hint for the expected size of the Avro header in bytes.
+ /// This can help optimize the initial read operation when fetching the
header.
+ pub fn with_header_size_hint(self, hint: u64) -> Self {
+ Self {
+ header_size_hint: Some(hint),
+ ..self
+ }
+ }
+
+ /// Enable usage of Utf8View types when reading string data.
+ pub fn with_utf8_view(self, utf8_view: bool) -> Self {
+ Self { utf8_view, ..self }
+ }
+
+ /// Enable strict mode for schema validation and data reading.
+ pub fn with_strict_mode(self, strict_mode: bool) -> Self {
+ Self {
+ strict_mode,
+ ..self
+ }
+ }
+}
+
+impl<R: AsyncFileReader> ReaderBuilder<R> {
+ async fn read_header(&mut self) -> Result<(Header, u64), AvroError> {
+ let mut decoder = HeaderDecoder::default();
+ let mut position = 0;
+ loop {
+ let range_to_fetch = position
+ ..(position +
self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
+ .min(self.file_size);
+
+ // Maybe EOF after the header, no actual data
+ if range_to_fetch.is_empty() {
+ break;
+ }
+
+ let current_data = self
+ .reader
+ .get_bytes(range_to_fetch.clone())
+ .await
+ .map_err(|err| {
+ AvroError::General(format!(
+ "Error fetching Avro header from file reader: {err}"
+ ))
+ })?;
+ if current_data.is_empty() {
+ return Err(AvroError::EOF(
+ "Unexpected EOF while fetching header data".into(),
+ ));
+ }
+
+ let read = current_data.len();
+ let decoded = decoder.decode(¤t_data)?;
+ if decoded != read {
+ position += decoded as u64;
+ break;
+ }
+ position += read as u64;
+ }
+
+ decoder
+ .flush()
+ .map(|header| (header, position))
+ .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro
header".into()))
+ }
+
+ /// Build the asynchronous Avro reader with the provided parameters.
+ /// This reads the header first to initialize the reader state.
+ pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>,
AvroError> {
+ if self.file_size == 0 {
+ return Err(AvroError::InvalidArgument("File size cannot be
0".into()));
+ }
+
+ // Start by reading the header from the beginning of the avro file
+ // take the writer schema from the header
+ let (header, header_len) = self.read_header().await?;
+ let writer_schema = {
+ let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
+ AvroError::ParseError("No Avro schema present in file
header".to_string())
+ })?;
+ let json_string = std::str::from_utf8(raw)
+ .map_err(|e| {
+ AvroError::ParseError(format!("Invalid UTF-8 in Avro
schema header: {e}"))
+ })?
+ .to_string();
+ AvroSchema::new(json_string)
+ };
+
+ // If projection exists, project the reader schema,
+ // if no reader schema is provided, parse it from the header(get the
raw writer schema), and project that
+ // this projected schema will be the schema used for reading.
+ let projected_reader_schema = self
+ .projection
+ .as_deref()
+ .map(|projection| {
+ let base_schema = if let Some(reader_schema) =
&self.reader_schema {
+ reader_schema
+ } else {
+ &writer_schema
+ };
+ base_schema.project(projection)
+ })
+ .transpose()?;
+
+ // Use either the projected reader schema or the original reader
schema(if no projection)
+ // (both optional, at worst no reader schema is provided, in which
case we read with the writer schema)
+ let effective_reader_schema = projected_reader_schema
+ .as_ref()
+ .or(self.reader_schema.as_ref())
+ .map(|s| s.schema())
+ .transpose()?;
+
+ let root = {
+ let writer_schema = writer_schema.schema()?;
+ let mut builder = AvroFieldBuilder::new(&writer_schema);
+ if let Some(reader_schema) = &effective_reader_schema {
+ builder = builder.with_reader_schema(reader_schema);
+ }
+ builder
+ .with_utf8view(self.utf8_view)
+ .with_strict_mode(self.strict_mode)
+ .build()
+ }?;
+
+ let record_decoder =
RecordDecoder::try_new_with_options(root.data_type())?;
+ let decoder = Decoder::from_parts(
+ self.batch_size,
+ record_decoder,
+ None,
+ IndexMap::new(),
+ FingerprintAlgorithm::Rabin,
+ );
+ let range = match self.range {
+ Some(r) => {
+ // If this PartitionedFile's range starts at 0, we need to
skip the header bytes.
+ // But then we need to seek back 16 bytes to include the sync
marker for the first block,
+ // as the logic in this reader searches the data for the first
sync marker(after which a block starts),
+ // then reads blocks from the count, size etc.
+ let start =
r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header
length overflow, header was not long enough to contain avro
bytes".to_string()))?);
+ let end = r.end.max(start).min(self.file_size); // Ensure end
is not less than start, worst case range is empty
+ start..end
+ }
+ None => 0..self.file_size,
+ };
+
+ // Determine if there is actually data to fetch, note that we subtract
the header len from range.start,
+ // so we need to check if range.end == header_len to see if there's no
data after the header
+ let reader_state = if range.start == range.end || header_len ==
range.end {
+ ReaderState::Finished
+ } else {
+ ReaderState::Idle {
+ reader: self.reader,
+ }
+ };
+ let codec = header.compression()?;
+ let sync_marker = header.sync();
+
+ Ok(AsyncAvroFileReader::new(
+ range,
+ self.file_size,
+ decoder,
+ codec,
+ sync_marker,
+ reader_state,
+ ))
+ }
+}
diff --git a/arrow-avro/src/reader/async_reader/mod.rs
b/arrow-avro/src/reader/async_reader/mod.rs
new file mode 100644
index 0000000000..53229f8576
--- /dev/null
+++ b/arrow-avro/src/reader/async_reader/mod.rs
@@ -0,0 +1,1764 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::{BlockDecoder, BlockDecoderState};
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::ReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+use crate::errors::AvroError;
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum FetchNextBehaviour {
+ /// Initial read: scan for sync marker, then move to decoding blocks
+ ReadSyncMarker,
+ /// Parse VLQ header bytes one at a time until Data state, then continue
decoding
+ DecodeVLQHeader,
+ /// Continue decoding the current block with the fetched data
+ ContinueDecoding,
+}
+
+enum ReaderState<R> {
+ /// Intermediate state to fix ownership issues
+ InvalidState,
+ /// Initial state, fetch initial range
+ Idle { reader: R },
+ /// Fetching data from the reader
+ FetchingData {
+ future: BoxFuture<'static, Result<(R, Bytes), AvroError>>,
+ next_behaviour: FetchNextBehaviour,
+ },
+ /// Decode a block in a loop until completion
+ DecodingBlock { data: Bytes, reader: R },
+ /// Output batches from a decoded block
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ /// Successfully finished reading file contents; drain any remaining
buffered records
+ /// from the decoder into (possibly partial) output batches.
+ Flushing,
+ /// Done, flush decoder and return
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+///
+/// # Example
+///
+/// ```
+/// #[tokio::main(flavor = "current_thread")]
+/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// use std::io::Cursor;
+/// use std::sync::Arc;
+/// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+/// use arrow_schema::{DataType, Field, Schema};
+/// use arrow_avro::reader::AsyncAvroFileReader;
+/// use arrow_avro::writer::AvroWriter;
+/// use futures::TryStreamExt;
+///
+/// // Build a minimal Arrow schema and batch
+/// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
+/// let batch = RecordBatch::try_new(
+/// Arc::new(schema.clone()),
+/// vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+/// )?;
+///
+/// // Write an Avro OCF to memory
+/// let buffer: Vec<u8> = Vec::new();
+/// let mut writer = AvroWriter::new(buffer, schema)?;
+/// writer.write(&batch)?;
+/// writer.finish()?;
+/// let bytes = writer.into_inner();
+///
+/// // Create an async reader from the in-memory bytes
+/// // `tokio::fs::File` also implements `AsyncFileReader` for reading from
disk
+/// let file_size = bytes.len();
+/// let cursor = Cursor::new(bytes);
+/// let reader = AsyncAvroFileReader::builder(cursor, file_size as u64, 1024)
+/// .try_build()
+/// .await?;
+///
+/// // Consume the stream of RecordBatches
+/// let batches: Vec<RecordBatch> = reader.try_collect().await?;
+/// assert_eq!(batches.len(), 1);
+/// assert_eq!(batches[0].num_rows(), 3);
+/// Ok(())
+/// }
+/// ```
+pub struct AsyncAvroFileReader<R> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
ReaderBuilder<R> {
+ ReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ /// Calculate the byte range needed to complete the current block.
+ /// Only valid when block_decoder is in Data or Sync state.
+ /// Returns the range to fetch, or an error if EOF would be reached.
+ fn remaining_block_range(&self) -> Result<Range<u64>, AvroError> {
+ let remaining = self.block_decoder.bytes_remaining() as u64
+ + match self.block_decoder.state() {
+ BlockDecoderState::Data => 16, // Include sync marker
+ BlockDecoderState::Sync => 0,
+ state => {
+ return Err(AvroError::General(format!(
+ "remaining_block_range called in unexpected state:
{state:?}"
+ )));
+ }
+ };
+
+ let fetch_end = self.range.end + remaining;
+ if fetch_end > self.file_size {
+ return Err(AvroError::EOF(
+ "Avro block requires more bytes than what exists in the
file".into(),
+ ));
+ }
+
+ Ok(self.range.end..fetch_end)
+ }
+
+ /// Terminate the stream after returning this error once.
+ #[inline]
+ fn finish_with_error(
+ &mut self,
+ error: AvroError,
+ ) -> Poll<Option<Result<RecordBatch, AvroError>>> {
+ self.reader_state = ReaderState::Finished;
+ Poll::Ready(Some(Err(error)))
+ }
+
+ #[inline]
+ fn start_flushing(&mut self) {
+ self.reader_state = ReaderState::Flushing;
+ }
+
+ /// Drain any remaining buffered records from the decoder.
+ #[inline]
+ fn poll_flush(&mut self) -> Poll<Option<Result<RecordBatch, AvroError>>> {
+ match self.decoder.flush() {
+ Ok(Some(batch)) => {
+ self.reader_state = ReaderState::Flushing;
+ Poll::Ready(Some(Ok(batch)))
+ }
+ Ok(None) => {
+ self.reader_state = ReaderState::Finished;
+ Poll::Ready(None)
+ }
+ Err(e) => self.finish_with_error(e),
+ }
+ }
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ // The forbid question mark thing shouldn't apply here, as it is within
the future,
+ // so exported this to a separate function.
+ async fn fetch_bytes(mut reader: R, range: Range<u64>) -> Result<(R,
Bytes), AvroError> {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+
+ #[forbid(clippy::question_mark_used)]
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, AvroError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state,
ReaderState::InvalidState) {
+ ReaderState::Idle { reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end {
+ return
self.finish_with_error(AvroError::InvalidArgument(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )));
+ }
+
+ let future = Self::fetch_bytes(reader, range).boxed();
+ self.reader_state = ReaderState::FetchingData {
+ future,
+ next_behaviour: FetchNextBehaviour::ReadSyncMarker,
+ };
+ }
+ ReaderState::FetchingData {
+ mut future,
+ next_behaviour,
+ } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => return
self.finish_with_error(e),
+ Poll::Pending => {
+ self.reader_state = ReaderState::FetchingData {
+ future,
+ next_behaviour,
+ };
+ return Poll::Pending;
+ }
+ };
+
+ match next_behaviour {
+ FetchNextBehaviour::ReadSyncMarker => {
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync
marker
+ None => {
+ // Sync marker not found, valid if we
arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ FetchNextBehaviour::DecodeVLQHeader => {
+ let mut data = data_chunk;
+
+ // Feed bytes one at a time until we reach Data
state (VLQ header complete)
+ while !matches!(self.block_decoder.state(),
BlockDecoderState::Data) {
+ if data.is_empty() {
+ return
self.finish_with_error(AvroError::EOF(
+ "Unexpected EOF while reading Avro
block header".into(),
+ ));
+ }
+ let consumed = match
self.block_decoder.decode(&data[..1]) {
+ Ok(consumed) => consumed,
+ Err(e) => return self.finish_with_error(e),
+ };
+ if consumed == 0 {
+ return
self.finish_with_error(AvroError::General(
+ "BlockDecoder failed to consume byte
during VLQ header parsing"
+ .into(),
+ ));
+ }
+ data = data.slice(consumed..);
+ }
+
+ // Now we know the block size. Slice remaining
data to what we need.
+ let bytes_remaining =
self.block_decoder.bytes_remaining();
+ let data_to_use =
data.slice(..data.len().min(bytes_remaining));
+ let consumed = match
self.block_decoder.decode(&data_to_use) {
+ Ok(consumed) => consumed,
+ Err(e) => return self.finish_with_error(e),
+ };
+ if consumed != data_to_use.len() {
+ return
self.finish_with_error(AvroError::General(
+ "BlockDecoder failed to consume all bytes
after VLQ header parsing"
+ .into(),
+ ));
+ }
+
+ // May need more data to finish the block.
+ let range_to_fetch = match
self.remaining_block_range() {
+ Ok(range) if range.is_empty() => {
+ // All bytes fetched, move to decoding
block directly
+ self.reader_state =
ReaderState::DecodingBlock {
+ reader,
+ data: Bytes::new(),
+ };
+ continue;
+ }
+ Ok(range) => range,
+ Err(e) => return self.finish_with_error(e),
+ };
+
+ let future = Self::fetch_bytes(reader,
range_to_fetch).boxed();
+ self.reader_state = ReaderState::FetchingData {
+ future,
+ next_behaviour:
FetchNextBehaviour::ContinueDecoding,
+ };
+ continue;
+ }
+ FetchNextBehaviour::ContinueDecoding => {
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk,
+ };
+ }
+ }
+ }
+ ReaderState::InvalidState => {
+ return self.finish_with_error(AvroError::General(
+ "AsyncAvroFileReader in invalid state".into(),
+ ));
+ }
+ ReaderState::DecodingBlock { reader, mut data } => {
+ // Try to decode another block from the buffered reader.
+ let consumed = match self.block_decoder.decode(&data) {
+ Ok(consumed) => consumed,
+ Err(e) => return self.finish_with_error(e),
+ };
+ data = data.slice(consumed..);
+
+ // If we reached the end of the block, flush it, and move
to read batches.
+ if let Some(block) = self.block_decoder.flush() {
+ // Successfully decoded a block.
+ let block_count = block.count;
+ let block_data = Bytes::from_owner(if let Some(ref
codec) = self.codec {
+ match codec.decompress(&block.data) {
+ Ok(decompressed) => decompressed,
+ Err(e) => return self.finish_with_error(e),
+ }
+ } else {
+ block.data
+ });
+
+ // Since we have an active block, move to reading
batches
+ self.reader_state = ReaderState::ReadingBatches {
+ reader,
+ data,
+ block_data,
+ remaining_in_block: block_count,
+ };
+ continue;
+ }
+
+ // data should always be consumed unless Finished, if it
wasn't, something went wrong
+ if !data.is_empty() {
+ return self.finish_with_error(AvroError::General(
+ "BlockDecoder failed to make progress decoding
Avro block".into(),
+ ));
+ }
+
+ if matches!(self.block_decoder.state(),
BlockDecoderState::Finished) {
+ // We've already flushed, so if no batch was produced,
we are simply done.
+ self.finishing_partial_block = false;
+ self.start_flushing();
+ continue;
+ }
+
+ // If we've tried the following stage before, and still
can't decode,
+ // this means the file is truncated or corrupted.
+ if self.finishing_partial_block {
+ return self.finish_with_error(AvroError::EOF(
+ "Unexpected EOF while reading last Avro
block".into(),
+ ));
+ }
+
+ // Avro splitting case: block is incomplete, we need to:
+ // 1. Parse the length so we know how much to read
+ // 2. Fetch more data from the reader
+ // 3. Create a new block data from the remaining slice and
the newly fetched data
+ // 4. Continue decoding until end of block
+ self.finishing_partial_block = true;
+
+ // Mid-block, but we don't know how many bytes are missing
yet
+ if matches!(
+ self.block_decoder.state(),
+ BlockDecoderState::Count | BlockDecoderState::Size
+ ) {
+ // Max VLQ header is 20 bytes (10 bytes each for count
and size).
+ // Fetch just enough to complete it.
+ const MAX_VLQ_HEADER_SIZE: u64 = 20;
+ let fetch_end = (self.range.end +
MAX_VLQ_HEADER_SIZE).min(self.file_size);
+
+ // If there is nothing more to fetch, error out
+ if fetch_end == self.range.end {
+ return self.finish_with_error(AvroError::EOF(
+ "Unexpected EOF while reading Avro block
header".into(),
+ ));
+ }
+
+ let range_to_fetch = self.range.end..fetch_end;
+ self.range.end = fetch_end; // Track that we've
fetched these bytes
+
+ let future = Self::fetch_bytes(reader,
range_to_fetch).boxed();
+ self.reader_state = ReaderState::FetchingData {
+ future,
+ next_behaviour:
FetchNextBehaviour::DecodeVLQHeader,
+ };
+ continue;
+ }
+
+ // Otherwise, we're mid-block but know how many bytes are
remaining to fetch.
+ let range_to_fetch = match self.remaining_block_range() {
+ Ok(range) => range,
+ Err(e) => return self.finish_with_error(e),
+ };
+
+ let future = Self::fetch_bytes(reader,
range_to_fetch).boxed();
+ self.reader_state = ReaderState::FetchingData {
+ future,
+ next_behaviour: FetchNextBehaviour::ContinueDecoding,
+ };
+ continue;
+ }
+ ReaderState::ReadingBatches {
+ reader,
+ data,
+ mut block_data,
+ mut remaining_in_block,
+ } => {
+ let (consumed, records_decoded) =
+ match self.decoder.decode_block(&block_data,
remaining_in_block) {
+ Ok((consumed, records_decoded)) => (consumed,
records_decoded),
+ Err(e) => return self.finish_with_error(e),
+ };
+
+ remaining_in_block -= records_decoded;
+
+ if remaining_in_block == 0 {
+ if data.is_empty() {
+ // No more data to read, drain remaining buffered
records
+ self.start_flushing();
+ } else {
+ // Finished this block, move to decode next block
in the next iteration
+ self.reader_state = ReaderState::DecodingBlock {
reader, data };
+ }
+ } else {
+ // Still more records to decode in this block, slice
the already-read data and stay in this state
+ block_data = block_data.slice(consumed..);
+ self.reader_state = ReaderState::ReadingBatches {
+ reader,
+ data,
+ block_data,
+ remaining_in_block,
+ };
+ }
+
+ // We have a full batch ready, emit it
+ // (This is not mutually exclusive with the block being
finished, so the state change is valid)
+ if self.decoder.batch_is_full() {
+ return match self.decoder.flush() {
+ Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
+ Ok(None) =>
self.finish_with_error(AvroError::General(
+ "Decoder reported a full batch, but flush
returned None".into(),
+ )),
+ Err(e) => self.finish_with_error(e),
+ };
+ }
+ }
+ ReaderState::Flushing => {
+ return self.poll_flush();
+ }
+ ReaderState::Finished => {
+ // Terminal: once finished (including after an error),
always yield None
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ }
+ }
+ }
+}
+
+// To maintain compatibility with the expected stream results in the
ecosystem, this returns ArrowError.
+impl<R: AsyncFileReader + Unpin + 'static> Stream for AsyncAvroFileReader<R> {
+ type Item = Result<RecordBatch, ArrowError>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ self.read_next(cx).map_err(Into::into)
+ }
+}
+
+#[cfg(all(test, feature = "object_store"))]
+mod tests {
+ use super::*;
+ use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
+ use arrow_array::cast::AsArray;
+ use arrow_array::types::{Int32Type, Int64Type};
+ use arrow_array::*;
+ use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
+ use futures::{StreamExt, TryStreamExt};
+ use object_store::local::LocalFileSystem;
+ use object_store::path::Path;
+ use object_store::{ObjectStore, ObjectStoreExt};
+ use std::collections::HashMap;
+ use std::sync::Arc;
+
+ fn arrow_test_data(file: &str) -> String {
+ let base =
+ std::env::var("ARROW_TEST_DATA").unwrap_or_else(|_|
"../testing/data".to_string());
+ format!("{}/{}", base, file)
+ }
+
+ fn get_alltypes_schema() -> SchemaRef {
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int32, true),
+ Field::new("bool_col", DataType::Boolean, true),
+ Field::new("tinyint_col", DataType::Int32, true),
+ Field::new("smallint_col", DataType::Int32, true),
+ Field::new("int_col", DataType::Int32, true),
+ Field::new("bigint_col", DataType::Int64, true),
+ Field::new("float_col", DataType::Float32, true),
+ Field::new("double_col", DataType::Float64, true),
+ Field::new("date_string_col", DataType::Binary, true),
+ Field::new("string_col", DataType::Binary, true),
+ Field::new(
+ "timestamp_col",
+ DataType::Timestamp(TimeUnit::Microsecond,
Some("+00:00".into())),
+ true,
+ ),
+ ])
+ .with_metadata(HashMap::from([(
+ SCHEMA_METADATA_KEY.into(),
+ r#"{
+ "type": "record",
+ "name": "topLevelRecord",
+ "fields": [
+ {
+ "name": "id",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "bool_col",
+ "type": [
+ "boolean",
+ "null"
+ ]
+ },
+ {
+ "name": "tinyint_col",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "smallint_col",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "int_col",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "bigint_col",
+ "type": [
+ "long",
+ "null"
+ ]
+ },
+ {
+ "name": "float_col",
+ "type": [
+ "float",
+ "null"
+ ]
+ },
+ {
+ "name": "double_col",
+ "type": [
+ "double",
+ "null"
+ ]
+ },
+ {
+ "name": "date_string_col",
+ "type": [
+ "bytes",
+ "null"
+ ]
+ },
+ {
+ "name": "string_col",
+ "type": [
+ "bytes",
+ "null"
+ ]
+ },
+ {
+ "name": "timestamp_col",
+ "type": [
+ {
+ "type": "long",
+ "logicalType": "timestamp-micros"
+ },
+ "null"
+ ]
+ }
+ ]
+}
+"#
+ .into(),
+ )]));
+ Arc::new(schema)
+ }
+
+ fn get_alltypes_with_nulls_schema() -> SchemaRef {
+ let schema = Schema::new(vec![
+ Field::new("string_col", DataType::Binary, true),
+ Field::new("int_col", DataType::Int32, true),
+ Field::new("bool_col", DataType::Boolean, true),
+ Field::new("bigint_col", DataType::Int64, true),
+ Field::new("float_col", DataType::Float32, true),
+ Field::new("double_col", DataType::Float64, true),
+ Field::new("bytes_col", DataType::Binary, true),
+ ])
+ .with_metadata(HashMap::from([(
+ SCHEMA_METADATA_KEY.into(),
+ r#"{
+ "type": "record",
+ "name": "topLevelRecord",
+ "fields": [
+ {
+ "name": "string_col",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null
+ },
+ {
+ "name": "int_col",
+ "type": [
+ "null",
+ "int"
+ ],
+ "default": null
+ },
+ {
+ "name": "bool_col",
+ "type": [
+ "null",
+ "boolean"
+ ],
+ "default": null
+ },
+ {
+ "name": "bigint_col",
+ "type": [
+ "null",
+ "long"
+ ],
+ "default": null
+ },
+ {
+ "name": "float_col",
+ "type": [
+ "null",
+ "float"
+ ],
+ "default": null
+ },
+ {
+ "name": "double_col",
+ "type": [
+ "null",
+ "double"
+ ],
+ "default": null
+ },
+ {
+ "name": "bytes_col",
+ "type": [
+ "null",
+ "bytes"
+ ],
+ "default": null
+ }
+ ]
+}"#
+ .into(),
+ )]));
+
+ Arc::new(schema)
+ }
+
+ fn get_nested_records_schema() -> SchemaRef {
+ let schema = Schema::new(vec![
+ Field::new(
+ "f1",
+ DataType::Struct(
+ vec![
+ Field::new("f1_1", DataType::Utf8, false),
+ Field::new("f1_2", DataType::Int32, false),
+ Field::new(
+ "f1_3",
+ DataType::Struct(
+ vec![Field::new("f1_3_1", DataType::Float64,
false)].into(),
+ ),
+ false,
+ ),
+ ]
+ .into(),
+ ),
+ false,
+ ),
+ Field::new(
+ "f2",
+ DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::Struct(
+ vec![
+ Field::new("f2_1", DataType::Boolean, false),
+ Field::new("f2_2", DataType::Float32, false),
+ ]
+ .into(),
+ ),
+ false,
+ ))),
+ false,
+ ),
+ Field::new(
+ "f3",
+ DataType::Struct(vec![Field::new("f3_1", DataType::Utf8,
false)].into()),
+ true,
+ ),
+ Field::new(
+ "f4",
+ DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::Struct(vec![Field::new("f4_1", DataType::Int64,
false)].into()),
+ true,
+ ))),
+ false,
+ ),
+ ])
+ .with_metadata(HashMap::from([(
+ SCHEMA_METADATA_KEY.into(),
+ r#"{
+ "type": "record",
+ "namespace": "ns1",
+ "name": "record1",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "type": "record",
+ "namespace": "ns2",
+ "name": "record2",
+ "fields": [
+ {
+ "name": "f1_1",
+ "type": "string"
+ },
+ {
+ "name": "f1_2",
+ "type": "int"
+ },
+ {
+ "name": "f1_3",
+ "type": {
+ "type": "record",
+ "namespace": "ns3",
+ "name": "record3",
+ "fields": [
+ {
+ "name": "f1_3_1",
+ "type": "double"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "f2",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "namespace": "ns4",
+ "name": "record4",
+ "fields": [
+ {
+ "name": "f2_1",
+ "type": "boolean"
+ },
+ {
+ "name": "f2_2",
+ "type": "float"
+ }
+ ]
+ }
+ }
+ },
+ {
+ "name": "f3",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "namespace": "ns5",
+ "name": "record5",
+ "fields": [
+ {
+ "name": "f3_1",
+ "type": "string"
+ }
+ ]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "f4",
+ "type": {
+ "type": "array",
+ "items": [
+ "null",
+ {
+ "type": "record",
+ "namespace": "ns6",
+ "name": "record6",
+ "fields": [
+ {
+ "name": "f4_1",
+ "type": "long"
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+}
+"#
+ .into(),
+ )]));
+
+ Arc::new(schema)
+ }
+
+ async fn read_async_file(
+ path: &str,
+ batch_size: usize,
+ range: Option<Range<u64>>,
+ schema: Option<SchemaRef>,
+ projection: Option<Vec<usize>>,
+ ) -> Result<Vec<RecordBatch>, ArrowError> {
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(path).unwrap();
+
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+ let mut builder = AsyncAvroFileReader::builder(file_reader, file_size,
batch_size);
+
+ if let Some(s) = schema {
+ let reader_schema = AvroSchema::try_from(s.as_ref())?;
+ builder = builder.with_reader_schema(reader_schema);
+ }
+
+ if let Some(proj) = projection {
+ builder = builder.with_projection(proj);
+ }
+
+ if let Some(range) = range {
+ builder = builder.with_range(range);
+ }
+
+ let reader = builder.try_build().await?;
+ reader.try_collect().await
+ }
+
+ #[tokio::test]
+ async fn test_full_file_read() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, None, Some(schema), None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ assert_eq!(batch.num_columns(), 11);
+
+ let id_array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(id_array.value(0), 4);
+ assert_eq!(id_array.value(7), 1);
+ }
+
+ #[tokio::test]
+ async fn test_small_batch_size() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 2, None, Some(schema), None)
+ .await
+ .unwrap();
+ assert_eq!(batches.len(), 4);
+
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 11);
+ }
+
+ #[tokio::test]
+ async fn test_batch_size_one() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1, None, Some(schema), None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batches.len(), 8);
+ assert_eq!(batch.num_rows(), 1);
+ }
+
+ #[tokio::test]
+ async fn test_batch_size_larger_than_file() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 10000, None, Some(schema), None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ }
+
+ #[tokio::test]
+ async fn test_empty_range() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let range = 100..100;
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range), Some(schema),
None)
+ .await
+ .unwrap();
+ assert_eq!(batches.len(), 0);
+ }
+
+ #[tokio::test]
+ async fn test_range_starting_at_zero() {
+ // Tests that range starting at 0 correctly skips header
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let meta = store.head(&location).await.unwrap();
+
+ let range = 0..meta.size;
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range), Some(schema),
None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ }
+
+ #[tokio::test]
+ async fn test_range_after_header() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let meta = store.head(&location).await.unwrap();
+
+ let range = 100..meta.size;
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range), Some(schema),
None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert!(batch.num_rows() > 0);
+ }
+
+ #[tokio::test]
+ async fn test_range_no_sync_marker() {
+ // Small range unlikely to contain sync marker
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let range = 50..150;
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range), Some(schema),
None)
+ .await
+ .unwrap();
+ assert_eq!(batches.len(), 0);
+ }
+
+ #[tokio::test]
+ async fn test_range_starting_mid_file() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+
+ let range = 700..768; // Header ends at 675, so this should be
mid-block
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range), Some(schema),
None)
+ .await
+ .unwrap();
+ assert_eq!(batches.len(), 0);
+ }
+
+ #[tokio::test]
+ async fn test_range_ending_at_file_size() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let meta = store.head(&location).await.unwrap();
+
+ let range = 200..meta.size;
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range), Some(schema),
None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ }
+
+ #[tokio::test]
+ async fn test_incomplete_block_requires_fetch() {
+ // Range ends mid-block, should trigger fetching_rem_block logic
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let range = 0..1200;
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range), Some(schema),
None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8)
+ }
+
+ #[tokio::test]
+ async fn test_partial_vlq_header_requires_fetch() {
+ // Range ends mid-VLQ header, triggering the Count|Size partial fetch
logic.
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let range = 16..676; // Header should end at 675
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range), Some(schema),
None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8)
+ }
+
+ #[cfg(feature = "snappy")]
+ #[tokio::test]
+ async fn test_snappy_compressed_with_range() {
+ {
+ let file = arrow_test_data("avro/alltypes_plain.snappy.avro");
+ let store = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let meta = store.head(&location).await.unwrap();
+
+ let range = 200..meta.size;
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range),
Some(schema), None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert!(batch.num_rows() > 0);
+ }
+ }
+
+ #[tokio::test]
+ async fn test_nulls() {
+ let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
+ let schema = get_alltypes_with_nulls_schema();
+ let batches = read_async_file(&file, 1024, None, Some(schema), None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 1);
+ for col in batch.columns() {
+ assert!(col.is_null(0));
+ }
+ }
+
+ #[tokio::test]
+ async fn test_nested_records() {
+ let file = arrow_test_data("avro/nested_records.avro");
+ let schema = get_nested_records_schema();
+ let batches = read_async_file(&file, 1024, None, Some(schema), None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 2);
+ assert!(batch.num_columns() > 0);
+ }
+
+ #[tokio::test]
+ async fn test_stream_produces_multiple_batches() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+ let schema = get_alltypes_schema();
+ let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+ let reader = AsyncAvroFileReader::builder(
+ file_reader,
+ file_size,
+ 2, // Small batch size to force multiple batches
+ )
+ .with_reader_schema(reader_schema)
+ .try_build()
+ .await
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+
+ assert!(batches.len() > 1);
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 8);
+ }
+
+ #[tokio::test]
+ async fn test_stream_early_termination() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+ let schema = get_alltypes_schema();
+ let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1)
+ .with_reader_schema(reader_schema)
+ .try_build()
+ .await
+ .unwrap();
+
+ let first_batch =
reader.take(1).try_collect::<Vec<_>>().await.unwrap();
+
+ assert_eq!(first_batch.len(), 1);
+ assert!(first_batch[0].num_rows() > 0);
+ }
+
+ #[tokio::test]
+ async fn test_various_batch_sizes() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+
+ for batch_size in [1, 2, 3, 5, 7, 11, 100] {
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, batch_size, None,
Some(schema), None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ // Size should be what was provided, to the limit of the batch in
the file
+ assert_eq!(
+ batch.num_rows(),
+ batch_size.min(8),
+ "Failed with batch_size={}",
+ batch_size
+ );
+ }
+ }
+
+ #[tokio::test]
+ async fn test_range_larger_than_file() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let meta = store.head(&location).await.unwrap();
+
+ // Range extends beyond file size
+ let range = 100..(meta.size + 1000);
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, Some(range), Some(schema),
None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ // Should clamp to file size
+ assert_eq!(batch.num_rows(), 8);
+ }
+
+ #[tokio::test]
+ async fn test_roundtrip_write_then_async_read() {
+ use crate::writer::AvroWriter;
+ use arrow_array::{Float64Array, StringArray};
+ use std::fs::File;
+ use std::io::BufWriter;
+ use tempfile::tempdir;
+
+ // Schema with nullable and non-nullable fields of various types
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("score", DataType::Float64, true),
+ Field::new("count", DataType::Int64, false),
+ ]));
+
+ let dir = tempdir().unwrap();
+ let file_path = dir.path().join("roundtrip_test.avro");
+
+ // Write multiple batches with nulls
+ {
+ let file = File::create(&file_path).unwrap();
+ let writer = BufWriter::new(file);
+ let mut avro_writer = AvroWriter::new(writer,
schema.as_ref().clone()).unwrap();
+
+ // First batch: 3 rows with some nulls
+ let batch1 = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(StringArray::from(vec![
+ Some("alice"),
+ None,
+ Some("charlie"),
+ ])),
+ Arc::new(Float64Array::from(vec![Some(95.5), Some(87.3),
None])),
+ Arc::new(Int64Array::from(vec![10, 20, 30])),
+ ],
+ )
+ .unwrap();
+ avro_writer.write(&batch1).unwrap();
+
+ // Second batch: 2 rows
+ let batch2 = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![4, 5])),
+ Arc::new(StringArray::from(vec![Some("diana"),
Some("eve")])),
+ Arc::new(Float64Array::from(vec![None, Some(88.0)])),
+ Arc::new(Int64Array::from(vec![40, 50])),
+ ],
+ )
+ .unwrap();
+ avro_writer.write(&batch2).unwrap();
+
+ avro_writer.finish().unwrap();
+ }
+
+ // Read back with small batch size to produce multiple output batches
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file_path).unwrap();
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
+ .try_build()
+ .await
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+
+ // Verify we got multiple output batches due to small batch_size
+ assert!(
+ batches.len() > 1,
+ "Expected multiple batches with batch_size=2"
+ );
+
+ // Verify total row count
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 5);
+
+ // Concatenate all batches to verify data
+ let combined = arrow::compute::concat_batches(&batches[0].schema(),
&batches).unwrap();
+ assert_eq!(combined.num_rows(), 5);
+ assert_eq!(combined.num_columns(), 4);
+
+ // Check id column (non-nullable)
+ let id_array = combined
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(id_array.values(), &[1, 2, 3, 4, 5]);
+
+ // Check name column (nullable) - verify nulls are preserved
+ // Avro strings are read as Binary by default
+ let name_col = combined.column(1);
+ let name_array = name_col.as_string::<i32>();
+ assert_eq!(name_array.value(0), "alice");
+ assert!(name_col.is_null(1)); // second row has null name
+ assert_eq!(name_array.value(2), "charlie");
+
+ // Check score column (nullable) - verify nulls are preserved
+ let score_array = combined
+ .column(2)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ assert!(!score_array.is_null(0));
+ assert!((score_array.value(0) - 95.5).abs() < f64::EPSILON);
+ assert!(score_array.is_null(2)); // third row has null score
+ assert!(score_array.is_null(3)); // fourth row has null score
+ assert!(!score_array.is_null(4));
+ assert!((score_array.value(4) - 88.0).abs() < f64::EPSILON);
+
+ // Check count column (non-nullable)
+ let count_array = combined
+ .column(3)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(count_array.values(), &[10, 20, 30, 40, 50]);
+ }
+
+ #[tokio::test]
+ async fn test_alltypes_no_schema_no_projection() {
+ // No reader schema, no projection - uses writer schema from file
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let batches = read_async_file(&file, 1024, None, None, None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ assert_eq!(batch.num_columns(), 11);
+ assert_eq!(batch.schema().field(0).name(), "id");
+ }
+
+ #[tokio::test]
+ async fn test_alltypes_no_schema_with_projection() {
+ // No reader schema, with projection - project writer schema
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ // Project [tinyint_col, id, bigint_col] = indices [2, 0, 5]
+ let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0,
5]))
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ assert_eq!(batch.num_columns(), 3);
+ assert_eq!(batch.schema().field(0).name(), "tinyint_col");
+ assert_eq!(batch.schema().field(1).name(), "id");
+ assert_eq!(batch.schema().field(2).name(), "bigint_col");
+
+ // Verify data values
+ let tinyint_col = batch.column(0).as_primitive::<Int32Type>();
+ assert_eq!(tinyint_col.values(), &[0, 1, 0, 1, 0, 1, 0, 1]);
+
+ let id = batch.column(1).as_primitive::<Int32Type>();
+ assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
+
+ let bigint_col = batch.column(2).as_primitive::<Int64Type>();
+ assert_eq!(bigint_col.values(), &[0, 10, 0, 10, 0, 10, 0, 10]);
+ }
+
+ #[tokio::test]
+ async fn test_alltypes_with_schema_no_projection() {
+ // With reader schema, no projection
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let schema = get_alltypes_schema();
+ let batches = read_async_file(&file, 1024, None, Some(schema), None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ assert_eq!(batch.num_columns(), 11);
+ }
+
+ #[tokio::test]
+ async fn test_alltypes_with_schema_with_projection() {
+ // With reader schema, with projection
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let schema = get_alltypes_schema();
+ // Project [bool_col, id] = indices [1, 0]
+ let batches = read_async_file(&file, 1024, None, Some(schema),
Some(vec![1, 0]))
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ assert_eq!(batch.num_columns(), 2);
+ assert_eq!(batch.schema().field(0).name(), "bool_col");
+ assert_eq!(batch.schema().field(1).name(), "id");
+
+ let bool_col = batch.column(0).as_boolean();
+ assert!(bool_col.value(0));
+ assert!(!bool_col.value(1));
+
+ let id = batch.column(1).as_primitive::<Int32Type>();
+ assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
+ }
+
+ #[tokio::test]
+ async fn test_nested_no_schema_no_projection() {
+ // No reader schema, no projection
+ let file = arrow_test_data("avro/nested_records.avro");
+ let batches = read_async_file(&file, 1024, None, None, None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 4);
+ assert_eq!(batch.schema().field(0).name(), "f1");
+ assert_eq!(batch.schema().field(1).name(), "f2");
+ assert_eq!(batch.schema().field(2).name(), "f3");
+ assert_eq!(batch.schema().field(3).name(), "f4");
+ }
+
+ #[tokio::test]
+ async fn test_nested_no_schema_with_projection() {
+ // No reader schema, with projection - reorder nested fields
+ let file = arrow_test_data("avro/nested_records.avro");
+ // Project [f3, f1] = indices [2, 0]
+ let batches = read_async_file(&file, 1024, None, None, Some(vec![2,
0]))
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 2);
+ assert_eq!(batch.schema().field(0).name(), "f3");
+ assert_eq!(batch.schema().field(1).name(), "f1");
+ }
+
+ #[tokio::test]
+ async fn test_nested_with_schema_no_projection() {
+ // With reader schema, no projection
+ let file = arrow_test_data("avro/nested_records.avro");
+ let schema = get_nested_records_schema();
+ let batches = read_async_file(&file, 1024, None, Some(schema), None)
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 4);
+ }
+
+ #[tokio::test]
+ async fn test_nested_with_schema_with_projection() {
+ // With reader schema, with projection
+ let file = arrow_test_data("avro/nested_records.avro");
+ let schema = get_nested_records_schema();
+ // Project [f4, f2, f1] = indices [3, 1, 0]
+ let batches = read_async_file(&file, 1024, None, Some(schema),
Some(vec![3, 1, 0]))
+ .await
+ .unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 3);
+ assert_eq!(batch.schema().field(0).name(), "f4");
+ assert_eq!(batch.schema().field(1).name(), "f2");
+ assert_eq!(batch.schema().field(2).name(), "f1");
+ }
+
+ #[tokio::test]
+ async fn test_projection_error_out_of_bounds() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ // Index 100 is out of bounds for the 11-field schema
+ let err = read_async_file(&file, 1024, None, None, Some(vec![100]))
+ .await
+ .unwrap_err();
+ assert!(matches!(err, ArrowError::AvroError(_)));
+ assert!(err.to_string().contains("out of bounds"));
+ }
+
+ #[tokio::test]
+ async fn test_projection_error_duplicate_index() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ // Duplicate index 0
+ let err = read_async_file(&file, 1024, None, None, Some(vec![0, 0]))
+ .await
+ .unwrap_err();
+ assert!(matches!(err, ArrowError::AvroError(_)));
+ assert!(err.to_string().contains("Duplicate projection index"));
+ }
+
+ #[tokio::test]
+ async fn test_with_header_size_hint_small() {
+ // Use a very small header size hint to force multiple fetches
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+ let schema = get_alltypes_schema();
+ let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+
+ // Use a tiny header hint (64 bytes) - header is much larger
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+ .with_reader_schema(reader_schema)
+ .with_header_size_hint(64)
+ .try_build()
+ .await
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ assert_eq!(batch.num_columns(), 11);
+ }
+
+ #[tokio::test]
+ async fn test_with_header_size_hint_large() {
+ // Use a larger header size hint than needed
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+ let schema = get_alltypes_schema();
+ let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+
+ // Use a large header hint (64KB)
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+ .with_reader_schema(reader_schema)
+ .with_header_size_hint(64 * 1024)
+ .try_build()
+ .await
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 8);
+ assert_eq!(batch.num_columns(), 11);
+ }
+
+ #[tokio::test]
+ async fn test_with_utf8_view_enabled() {
+ // Test that utf8_view produces StringViewArray instead of StringArray
+ let file = arrow_test_data("avro/nested_records.avro");
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+ .with_utf8_view(true)
+ .try_build()
+ .await
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 2);
+
+ // The f1 struct contains f1_1 which is a string field
+ // With utf8_view enabled, it should be Utf8View type
+ let f1_col = batch.column(0);
+ let f1_struct = f1_col.as_struct();
+ let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
+
+ // Check that the data type is Utf8View
+ assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
+ }
+
+ #[tokio::test]
+ async fn test_with_utf8_view_disabled() {
+ // Test that without utf8_view, we get regular Utf8
+ let file = arrow_test_data("avro/nested_records.avro");
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+ .with_utf8_view(false)
+ .try_build()
+ .await
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_rows(), 2);
+
+ // The f1 struct contains f1_1 which is a string field
+ // Without utf8_view, it should be regular Utf8
+ let f1_col = batch.column(0);
+ let f1_struct = f1_col.as_struct();
+ let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
+
+ assert_eq!(f1_1_field.data_type(), &DataType::Utf8);
+ }
+
+ #[tokio::test]
+ async fn test_with_strict_mode_disabled_allows_null_second() {
+ // Test that with strict_mode disabled, unions of ['T', 'null'] are
allowed
+ // The alltypes_nulls_plain.avro file has unions with null second
+ let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+
+ // Without strict mode, this should succeed
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+ .with_strict_mode(false)
+ .try_build()
+ .await
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_rows(), 1);
+ }
+
+ #[tokio::test]
+ async fn test_with_strict_mode_enabled_rejects_null_second() {
+ // Test that with strict_mode enabled, unions of ['T', 'null'] are
rejected
+ // The alltypes_plain.avro file has unions like ["int", "null"] (null
second)
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+
+ // With strict mode, this should fail because of ['T', 'null'] unions
+ let result = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+ .with_strict_mode(true)
+ .try_build()
+ .await;
+
+ match result {
+ Ok(_) => panic!("Expected error for strict_mode with ['T', 'null']
union"),
+ Err(err) => {
+ assert!(
+ err.to_string().contains("disallowed in strict_mode"),
+ "Expected strict_mode error, got: {}",
+ err
+ );
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_with_strict_mode_enabled_valid_schema() {
+ // Test that strict_mode works with schemas that have proper ['null',
'T'] unions
+ // The nested_records.avro file has properly ordered unions
+ let file = arrow_test_data("avro/nested_records.avro");
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+
+ // With strict mode, properly ordered unions should still work
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+ .with_strict_mode(true)
+ .try_build()
+ .await
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_rows(), 2);
+ }
+
+ #[tokio::test]
+ async fn test_builder_options_combined() {
+ // Test combining multiple builder options
+ let file = arrow_test_data("avro/nested_records.avro");
+ let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let file_reader = AvroObjectReader::new(store, location);
+
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
+ .with_header_size_hint(128)
+ .with_utf8_view(true)
+ .with_strict_mode(true)
+ .with_projection(vec![0, 2]) // f1 and f3
+ .try_build()
+ .await
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+ let batch = &batches[0];
+
+ // Should have 2 columns (f1 and f3) due to projection
+ assert_eq!(batch.num_columns(), 2);
+ assert_eq!(batch.schema().field(0).name(), "f1");
+ assert_eq!(batch.schema().field(1).name(), "f3");
+
+ // Verify utf8_view is applied
+ let f1_col = batch.column(0);
+ let f1_struct = f1_col.as_struct();
+ let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
+ assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
+ }
+}
diff --git a/arrow-avro/src/reader/async_reader/store.rs
b/arrow-avro/src/reader/async_reader/store.rs
new file mode 100644
index 0000000000..44a4abf1a2
--- /dev/null
+++ b/arrow-avro/src/reader/async_reader/store.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::AvroError;
+use crate::reader::async_reader::AsyncFileReader;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, TryFutureExt};
+use object_store::ObjectStore;
+use object_store::ObjectStoreExt;
+use object_store::path::Path;
+use std::error::Error;
+use std::ops::Range;
+use std::sync::Arc;
+use tokio::runtime::Handle;
+
+/// An implementation of an AsyncFileReader using the [`ObjectStore`] API.
+pub struct AvroObjectReader {
+ store: Arc<dyn ObjectStore>,
+ path: Path,
+ runtime: Option<Handle>,
+}
+
+impl AvroObjectReader {
+ /// Creates a new [`Self`] from a store implementation and file location.
+ pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+ Self {
+ store,
+ path,
+ runtime: None,
+ }
+ }
+
+ /// Perform IO on the provided tokio runtime
+ ///
+ /// Tokio is a cooperative scheduler, and relies on tasks yielding in a
timely manner
+ /// to service IO. Therefore, running IO and CPU-bound tasks, such as avro
decoding,
+ /// on the same tokio runtime can lead to degraded throughput, dropped
connections and
+ /// other issues. For more information see [here].
+ ///
+ /// [here]:
https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+ pub fn with_runtime(self, handle: Handle) -> Self {
+ Self {
+ runtime: Some(handle),
+ ..self
+ }
+ }
+
+ fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O, AvroError>>
+ where
+ F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a,
Result<O, E>>
+ + Send
+ + 'static,
+ O: Send + 'static,
+ E: Error + Send + 'static,
+ {
+ match &self.runtime {
+ Some(handle) => {
+ let path = self.path.clone();
+ let store = Arc::clone(&self.store);
+ handle
+ .spawn(async move { f(&store, &path).await })
+ .map_ok_or_else(
+ |e| match e.try_into_panic() {
+ Err(e) => Err(AvroError::External(Box::new(e))),
+ Ok(p) => std::panic::resume_unwind(p),
+ },
+ |res| res.map_err(|e|
AvroError::General(e.to_string())),
+ )
+ .boxed()
+ }
+ None => f(&self.store, &self.path)
+ .map_err(|e| AvroError::General(e.to_string()))
+ .boxed(),
+ }
+ }
+}
+
+impl AsyncFileReader for AvroObjectReader {
+ fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes,
AvroError>> {
+ self.spawn(|store, path| async move { store.get_range(path,
range).await }.boxed())
+ }
+
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<u64>>,
+ ) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>>
+ where
+ Self: Send,
+ {
+ self.spawn(|store, path| async move { store.get_ranges(path,
&ranges).await }.boxed())
+ }
+}
diff --git a/arrow-avro/src/reader/block.rs b/arrow-avro/src/reader/block.rs
index 540aedee52..e20818bf96 100644
--- a/arrow-avro/src/reader/block.rs
+++ b/arrow-avro/src/reader/block.rs
@@ -43,7 +43,7 @@ pub struct BlockDecoder {
}
#[derive(Debug)]
-enum BlockDecoderState {
+pub(crate) enum BlockDecoderState {
Count,
Size,
Data,
@@ -137,3 +137,14 @@ impl BlockDecoder {
}
}
}
+
+#[cfg(feature = "async")]
+impl BlockDecoder {
+ pub(crate) fn state(&self) -> &BlockDecoderState {
+ &self.state
+ }
+
+ pub(crate) fn bytes_remaining(&self) -> usize {
+ self.bytes_remaining
+ }
+}
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 55763aeb62..a60bc26b49 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -499,6 +499,14 @@ mod header;
mod record;
mod vlq;
+#[cfg(feature = "async")]
+mod async_reader;
+
+#[cfg(feature = "object_store")]
+pub use async_reader::AvroObjectReader;
+#[cfg(feature = "async")]
+pub use async_reader::{AsyncAvroFileReader, AsyncFileReader};
+
fn is_incomplete_data(err: &AvroError) -> bool {
matches!(
err,
@@ -641,6 +649,25 @@ pub struct Decoder {
}
impl Decoder {
+ pub(crate) fn from_parts(
+ batch_size: usize,
+ active_decoder: RecordDecoder,
+ active_fingerprint: Option<Fingerprint>,
+ cache: IndexMap<Fingerprint, RecordDecoder>,
+ fingerprint_algorithm: FingerprintAlgorithm,
+ ) -> Self {
+ Self {
+ batch_size,
+ remaining_capacity: batch_size,
+ active_fingerprint,
+ active_decoder,
+ cache,
+ fingerprint_algorithm,
+ pending_schema: None,
+ awaiting_body: false,
+ }
+ }
+
/// Returns the Arrow schema for the rows decoded by this decoder.
///
/// **Note:** With single‑object or Confluent framing, the schema may
change
@@ -999,25 +1026,6 @@ impl ReaderBuilder {
self.make_record_decoder(writer_schema, reader_schema_raw.as_ref())
}
- fn make_decoder_with_parts(
- &self,
- active_decoder: RecordDecoder,
- active_fingerprint: Option<Fingerprint>,
- cache: IndexMap<Fingerprint, RecordDecoder>,
- fingerprint_algorithm: FingerprintAlgorithm,
- ) -> Decoder {
- Decoder {
- batch_size: self.batch_size,
- remaining_capacity: self.batch_size,
- active_fingerprint,
- active_decoder,
- cache,
- fingerprint_algorithm,
- pending_schema: None,
- awaiting_body: false,
- }
- }
-
fn make_decoder(
&self,
header: Option<&Header>,
@@ -1054,7 +1062,8 @@ impl ReaderBuilder {
let effective_reader_schema =
projected_reader_schema.as_ref().or(reader_schema);
let record_decoder =
self.make_record_decoder_from_schemas(&writer_schema,
effective_reader_schema)?;
- return Ok(self.make_decoder_with_parts(
+ return Ok(Decoder::from_parts(
+ self.batch_size,
record_decoder,
None,
IndexMap::new(),
@@ -1121,7 +1130,8 @@ impl ReaderBuilder {
"Initial fingerprint {start_fingerprint:?} not found in schema
store"
))
})?;
- Ok(self.make_decoder_with_parts(
+ Ok(Decoder::from_parts(
+ self.batch_size,
active_decoder,
Some(start_fingerprint),
cache,