This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 2e419f8 read_range with exact API when available (#628)
2e419f8 is described below
commit 2e419f8206e2c8b68a044606e9237cc206851084
Author: Adam Gutglick <[email protected]>
AuthorDate: Thu Feb 5 12:51:47 2026 +0000
read_range with exact API when available (#628)
Signed-off-by: Adam Gutglick <[email protected]>
---
src/lib.rs | 25 ++++--------
src/local.rs | 130 ++++++++++++++++++++++++++++++++++++++++++++++-------------
2 files changed, 108 insertions(+), 47 deletions(-)
diff --git a/src/lib.rs b/src/lib.rs
index b22c6c4..0a7e509 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -116,6 +116,8 @@
//!
//! [`BufReader`]: buffered::BufReader
//! [`BufWriter`]: buffered::BufWriter
+//! [`Read`]: std::io::Read
+//! [`Seek`]: std::io::Seek
//!
//! # Adapters
//!
@@ -607,8 +609,6 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{StreamExt, TryStreamExt, stream::BoxStream};
use std::fmt::{Debug, Formatter};
-#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
-use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
@@ -1650,22 +1650,11 @@ impl GetResult {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(mut file, path) => {
maybe_spawn_blocking(move || {
- file.seek(SeekFrom::Start(self.range.start as _))
- .map_err(|source| local::Error::Seek {
- source,
- path: path.clone(),
- })?;
-
- let mut buffer = if let Ok(len) = len.try_into() {
- Vec::with_capacity(len)
- } else {
- Vec::new()
- };
- file.take(len as _)
- .read_to_end(&mut buffer)
- .map_err(|source| local::Error::UnableToReadBytes {
source, path })?;
-
- Ok(buffer.into())
+ use crate::local::read_range;
+
+ let buffer = read_range(&mut file, &path, self.range)?;
+
+ Ok(buffer)
})
.await
}
diff --git a/src/local.rs b/src/local.rs
index 573f969..3a6a7a4 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -19,6 +19,10 @@
use std::fs::{File, Metadata, OpenOptions, metadata, symlink_metadata};
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
+#[cfg(target_family = "unix")]
+use std::os::unix::fs::FileExt;
+#[cfg(target_family = "windows")]
+use std::os::windows::fs::FileExt;
use std::sync::Arc;
use std::time::SystemTime;
use std::{collections::BTreeSet, io};
@@ -974,44 +978,112 @@ pub(crate) fn read_range(
path: &std::path::Path,
range: Range<u64>,
) -> Result<Bytes> {
- file.seek(SeekFrom::Start(range.start))
- .map_err(|err| map_seek_error(err, file, path, range.start))?;
-
let requested = range.end - range.start;
+
let mut buf = Vec::with_capacity(requested as usize);
- let read = file.take(requested).read_to_end(&mut buf).map_err(|err| {
- // try to read metadata to give a better error in case of directory
- if let Err(e) = open_metadata(file, path) {
- return e;
- }
- Error::UnableToReadBytes {
- source: err,
- path: path.to_path_buf(),
- }
- })? as u64;
- if read != requested {
- let metadata = open_metadata(file, path)?;
- let file_len = metadata.len();
+ #[cfg(any(target_family = "unix", target_family = "windows"))]
+ {
+ buf.resize(requested as usize, 0_u8);
- if range.start >= file_len {
- return Err(Error::InvalidRange {
- source: InvalidGetRange::StartTooLarge {
- requested: range.start,
- length: file_len,
- },
+ let mut buf_slice = &mut buf[..];
+ let mut offset = range.start;
+
+ while !buf_slice.is_empty() {
+ #[cfg(target_family = "unix")]
+ let read_result = file.read_at(buf_slice, offset);
+
+ #[cfg(target_family = "windows")]
+ let read_result = file.seek_read(buf_slice, offset);
+
+ match read_result {
+ Ok(0) => break,
+ Ok(n) => {
+ let tmp = buf_slice;
+ buf_slice = &mut tmp[n..];
+ offset += n as u64;
+ }
+ // This error is recoverable
+ Err(e) if e.kind() == ErrorKind::Interrupted => {}
+ Err(source) => {
+ let error = Error::UnableToReadBytes {
+ source,
+ path: path.into(),
+ };
+
+ return Err(error.into());
+ }
}
- .into());
}
- let expected = range.end.min(file_len) - range.start;
- if read != expected {
- return Err(Error::OutOfRange {
- path: path.to_path_buf(),
+ // If we reached EOF before filling the buffer
+ if !buf_slice.is_empty() {
+ let metadata = open_metadata(file, path)?;
+ let file_len = metadata.len();
+
+ // If none of the range is satisfiable we should error, e.g. if
the start offset is beyond the
+ // extents of the file, or if its at the end of the file and wants
to read a non-empty range.
+ // if range.start > file_len || (range.start == file_len &&
!range.is_empty()) {
+ if range.start >= file_len {
+ return Err(Error::InvalidRange {
+ source: InvalidGetRange::StartTooLarge {
+ requested: range.start,
+ length: file_len,
+ },
+ }
+ .into());
+ }
+
+ let expected = range.end.min(file_len) - range.start;
+
+ let error = Error::OutOfRange {
+ path: path.into(),
expected,
- actual: read,
+ actual: offset - range.start,
+ };
+
+ return Err(error.into());
+ }
+ }
+ #[cfg(all(not(windows), not(unix)))]
+ {
+ file.seek(SeekFrom::Start(range.start))
+ .map_err(|err| map_seek_error(err, file, path, range.start))?;
+
+ let read = file.take(requested).read_to_end(&mut buf).map_err(|err| {
+ // try to read metadata to give a better error in case of directory
+ if let Err(e) = open_metadata(file, path) {
+ return e;
+ }
+ Error::UnableToReadBytes {
+ source: err,
+ path: path.to_path_buf(),
+ }
+ })? as u64;
+
+ if read != requested {
+ let metadata = open_metadata(file, path)?;
+ let file_len = metadata.len();
+
+ if range.start >= file_len {
+ return Err(Error::InvalidRange {
+ source: InvalidGetRange::StartTooLarge {
+ requested: range.start,
+ length: file_len,
+ },
+ }
+ .into());
+ }
+
+ let expected = range.end.min(file_len) - range.start;
+ if read != expected {
+ return Err(Error::OutOfRange {
+ path: path.to_path_buf(),
+ expected,
+ actual: read,
+ }
+ .into());
}
- .into());
}
}