This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new c5f1b2b Avoid metadata lookup for `LocalFileSystem::read_ranges` and
`chunked_stream` (#621)
c5f1b2b is described below
commit c5f1b2b9af2790935c8ffc48ab8ba3ddb9103c16
Author: Daniƫl Heres <[email protected]>
AuthorDate: Tue Feb 3 19:43:35 2026 +0100
Avoid metadata lookup for `LocalFileSystem::read_ranges` and
`chunked_stream` (#621)
* Avoid metadata lookup
* Avoid metadata lookup
* Clippy
* Allocation fix
* Factor out error mapping, reduce duplication and simplify error handling
* clippy
* Simplify more
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
src/local.rs | 148 +++++++++++++++++++++++++++++++++++------------------------
1 file changed, 89 insertions(+), 59 deletions(-)
diff --git a/src/local.rs b/src/local.rs
index 961ed12..573f969 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -418,7 +418,8 @@ impl ObjectStore for LocalFileSystem {
let location = location.clone();
let path = self.path_to_filesystem(&location)?;
maybe_spawn_blocking(move || {
- let (file, metadata) = open_file(&path)?;
+ let file = open_file(&path)?;
+ let metadata = open_metadata(&file, &path)?;
let meta = convert_metadata(metadata, location);
options.check_preconditions(&meta)?;
@@ -444,10 +445,11 @@ impl ObjectStore for LocalFileSystem {
let ranges = ranges.to_vec();
maybe_spawn_blocking(move || {
// Vectored IO might be faster
- let (mut file, metadata) = open_file(&path)?;
+ // We do not read the metadata here, but error in `read_range` if
necessary
+ let mut file = File::open(&path).map_err(|e| map_open_error(e,
&path))?;
ranges
.into_iter()
- .map(|r| read_range(&mut file, metadata.len(), &path, r))
+ .map(|r| read_range(&mut file, &path, r))
.collect()
})
.await
@@ -924,18 +926,17 @@ pub(crate) fn chunked_stream(
chunk_size: usize,
) -> BoxStream<'static, Result<Bytes, super::Error>> {
futures::stream::once(async move {
+ let requested = range.end - range.start;
+
let (file, path) = maybe_spawn_blocking(move || {
file.seek(SeekFrom::Start(range.start as _))
- .map_err(|source| Error::Seek {
- source,
- path: path.clone(),
- })?;
+ .map_err(|err| map_seek_error(err, &file, &path,
range.start))?;
Ok((file, path))
})
.await?;
let stream = futures::stream::try_unfold(
- (file, path, range.end - range.start),
+ (file, path, requested),
move |(mut file, path, remaining)| {
maybe_spawn_blocking(move || {
if remaining == 0 {
@@ -970,70 +971,99 @@ pub(crate) fn chunked_stream(
pub(crate) fn read_range(
file: &mut File,
- file_len: u64,
- path: &PathBuf,
+ path: &std::path::Path,
range: Range<u64>,
) -> Result<Bytes> {
- // If none of the range is satisfiable we should error, e.g. if the start
offset is beyond the
- // extents of the file
- if range.start >= file_len {
- return Err(Error::InvalidRange {
- source: InvalidGetRange::StartTooLarge {
- requested: range.start,
- length: file_len,
- },
+ file.seek(SeekFrom::Start(range.start))
+ .map_err(|err| map_seek_error(err, file, path, range.start))?;
+
+ let requested = range.end - range.start;
+ let mut buf = Vec::with_capacity(requested as usize);
+ let read = file.take(requested).read_to_end(&mut buf).map_err(|err| {
+ // try to read metadata to give a better error in case of directory
+ if let Err(e) = open_metadata(file, path) {
+ return e;
}
- .into());
- }
+ Error::UnableToReadBytes {
+ source: err,
+ path: path.to_path_buf(),
+ }
+ })? as u64;
- // Don't read past end of file
- let to_read = range.end.min(file_len) - range.start;
+ if read != requested {
+ let metadata = open_metadata(file, path)?;
+ let file_len = metadata.len();
- file.seek(SeekFrom::Start(range.start)).map_err(|source| {
- let path = path.into();
- Error::Seek { source, path }
- })?;
+ if range.start >= file_len {
+ return Err(Error::InvalidRange {
+ source: InvalidGetRange::StartTooLarge {
+ requested: range.start,
+ length: file_len,
+ },
+ }
+ .into());
+ }
- let mut buf = Vec::with_capacity(to_read as usize);
- let read = file.take(to_read).read_to_end(&mut buf).map_err(|source| {
- let path = path.into();
- Error::UnableToReadBytes { source, path }
- })? as u64;
+ let expected = range.end.min(file_len) - range.start;
+ if read != expected {
+ return Err(Error::OutOfRange {
+ path: path.to_path_buf(),
+ expected,
+ actual: read,
+ }
+ .into());
+ }
+ }
- if read != to_read {
- let error = Error::OutOfRange {
- path: path.into(),
- expected: to_read,
- actual: read,
- };
+ Ok(buf.into())
+}
+
+fn open_file(path: &std::path::Path) -> Result<File, Error> {
+ File::open(path).map_err(|e| map_open_error(e, path))
+}
- return Err(error.into());
+fn open_metadata(file: &File, path: &std::path::Path) -> Result<Metadata,
Error> {
+ let metadata = file.metadata().map_err(|e| map_open_error(e, path))?;
+ if metadata.is_dir() {
+ Err(Error::NotFound {
+ path: PathBuf::from(path),
+ source: io::Error::new(ErrorKind::NotFound, "is directory"),
+ })
+ } else {
+ Ok(metadata)
}
+}
- Ok(buf.into())
+/// Translates errors from opening a file into a more specific [`Error`] when
possible
+fn map_open_error(source: io::Error, path: &std::path::Path) -> Error {
+ let path = PathBuf::from(path);
+ match source.kind() {
+ ErrorKind::NotFound => Error::NotFound { path, source },
+ _ => Error::UnableToOpenFile { path, source },
+ }
}
-fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {
- let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
- Err(e) => Err(match e.kind() {
- ErrorKind::NotFound => Error::NotFound {
- path: path.clone(),
- source: e,
- },
- _ => Error::UnableToOpenFile {
- path: path.clone(),
- source: e,
+/// Translates errors from attempting to a file into a more specific [`Error`]
when possible
+fn map_seek_error(source: io::Error, file: &File, path: &std::path::Path,
requested: u64) -> Error {
+ // if we can't seek, check if start is out of bounds to give
+ // a better error. Don't read metadata before to avoid
+ // an extra syscall in the common case
+ let m = match open_metadata(file, path) {
+ Err(e) => return e,
+ Ok(m) => m,
+ };
+ if requested >= m.len() {
+ return Error::InvalidRange {
+ source: InvalidGetRange::StartTooLarge {
+ requested,
+ length: m.len(),
},
- }),
- Ok((metadata, file)) => match !metadata.is_dir() {
- true => Ok((file, metadata)),
- false => Err(Error::NotFound {
- path: path.clone(),
- source: io::Error::new(ErrorKind::NotFound, "is directory"),
- }),
- },
- }?;
- Ok(ret)
+ };
+ }
+ Error::Seek {
+ source,
+ path: PathBuf::from(path),
+ }
}
fn convert_entry(entry: DirEntry, location: Path) ->
Result<Option<ObjectMeta>> {