This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new e00ff79 refactor!: `rename` & `rename_if_not_exists` =>
`rename_opts` (#555)
e00ff79 is described below
commit e00ff79c03a6b9e017c7b6d905d1bab6a15b3923
Author: Marco Neumann <[email protected]>
AuthorDate: Thu Dec 4 13:42:38 2025 +0100
refactor!: `rename` & `rename_if_not_exists` => `rename_opts` (#555)
Change the `ObjectStore` core trait to have a single, extensible rename
operation. This helps #385 and #297.
Also adds extensions similar to
apache/arrow-rs#7170
and
apache/arrow-rs#7213 .
Also see #548 -- which did something similar for `copy`.
---
src/chunked.rs | 10 ++---
src/lib.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++++++--------
src/limit.rs | 13 ++----
src/local.rs | 62 +++++++++++++++++++-------
src/prefix.rs | 12 ++---
src/throttle.rs | 12 ++---
6 files changed, 174 insertions(+), 70 deletions(-)
diff --git a/src/chunked.rs b/src/chunked.rs
index 53dbf1e..49632ed 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -29,7 +29,7 @@ use futures::stream::BoxStream;
use crate::path::Path;
use crate::{
CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta,
- ObjectStore, PutMultipartOptions, PutOptions, PutResult,
+ ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions,
};
use crate::{PutPayload, Result};
@@ -170,12 +170,8 @@ impl ObjectStore for ChunkedStore {
self.inner.copy_opts(from, to, options).await
}
- async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
- self.inner.rename(from, to).await
- }
-
- async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
- self.inner.rename_if_not_exists(from, to).await
+ async fn rename_opts(&self, from: &Path, to: &Path, options:
RenameOptions) -> Result<()> {
+ self.inner.rename_opts(from, to, options).await
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 736f173..b49f910 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1035,19 +1035,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync
+ Debug + 'static {
///
/// By default, this is implemented as a copy and then delete source. It
may not
/// check when deleting source that it was the same object that was
originally copied.
- ///
- /// If there exists an object at the destination, it will be overwritten.
- async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
- self.copy(from, to).await?;
- self.delete(from).await
- }
-
- /// Move an object from one path to another in the same object store.
- ///
- /// Will return an error if the destination already has an object.
- async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
- self.copy_if_not_exists(from, to).await?;
- self.delete(from).await
+ async fn rename_opts(&self, from: &Path, to: &Path, options:
RenameOptions) -> Result<()> {
+ let RenameOptions {
+ target_mode,
+ extensions,
+ } = options;
+ let copy_mode = match target_mode {
+ RenameTargetMode::Overwrite => CopyMode::Overwrite,
+ RenameTargetMode::Create => CopyMode::Create,
+ };
+ let copy_options = CopyOptions {
+ mode: copy_mode,
+ extensions,
+ };
+ self.copy_opts(from, to, copy_options).await?;
+ self.delete(from).await?;
+ Ok(())
}
}
@@ -1116,12 +1119,13 @@ macro_rules! as_ref_impl {
self.as_ref().copy_opts(from, to, options).await
}
- async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
- self.as_ref().rename(from, to).await
- }
-
- async fn rename_if_not_exists(&self, from: &Path, to: &Path) ->
Result<()> {
- self.as_ref().rename_if_not_exists(from, to).await
+ async fn rename_opts(
+ &self,
+ from: &Path,
+ to: &Path,
+ options: RenameOptions,
+ ) -> Result<()> {
+ self.as_ref().rename_opts(from, to, options).await
}
}
};
@@ -1238,6 +1242,19 @@ pub trait ObjectStoreExt: ObjectStore {
/// If atomic operations are not supported by the underlying object
storage (like S3)
/// it will return an error.
fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output
= Result<()>>;
+
+ /// Move an object from one path to another in the same object store.
+ ///
+ /// By default, this is implemented as a copy and then delete source. It
may not
+ /// check when deleting source that it was the same object that was
originally copied.
+ ///
+ /// If there exists an object at the destination, it will be overwritten.
+ fn rename(&self, from: &Path, to: &Path) -> impl Future<Output =
Result<()>>;
+
+ /// Move an object from one path to another in the same object store.
+ ///
+ /// Will return an error if the destination already has an object.
+ fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl
Future<Output = Result<()>>;
}
impl<T> ObjectStoreExt for T
@@ -1277,6 +1294,16 @@ where
let options = CopyOptions::new().with_mode(CopyMode::Create);
self.copy_opts(from, to, options).await
}
+
+ async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+ let options =
RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite);
+ self.rename_opts(from, to, options).await
+ }
+
+ async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
+ let options =
RenameOptions::new().with_target_mode(RenameTargetMode::Create);
+ self.rename_opts(from, to, options).await
+ }
}
/// Result of a list call that includes objects, prefixes (directories) and a
@@ -1828,6 +1855,76 @@ impl PartialEq<Self> for CopyOptions {
impl Eq for CopyOptions {}
+/// Configure preconditions for the target of rename operation.
+///
+/// Note though that the source location may or not be deleted at the same
time in an atomic operation. There is
+/// currently NO flag to control the atomicity of "delete source at the same
time as creating the target".
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum RenameTargetMode {
+ /// Perform a write operation on the target, overwriting any object
present at the provided path.
+ #[default]
+ Overwrite,
+ /// Perform an atomic write operation of the target, returning
[`Error::AlreadyExists`] if an
+ /// object already exists at the provided path.
+ Create,
+}
+
+/// Options for a rename request
+#[derive(Debug, Clone, Default)]
+pub struct RenameOptions {
+ /// Configure the [`RenameTargetMode`] for this operation
+ pub target_mode: RenameTargetMode,
+ /// Implementation-specific extensions. Intended for use by
[`ObjectStore`] implementations
+ /// that need to pass context-specific information (like tracing spans)
via trait methods.
+ ///
+ /// These extensions are ignored entirely by backends offered through this
crate.
+ ///
+ /// They are also excluded from [`PartialEq`] and [`Eq`].
+ pub extensions: Extensions,
+}
+
+impl RenameOptions {
+ /// Create a new [`RenameOptions`]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Sets the `target_mode=.
+ ///
+ /// See [`RenameOptions::target_mode`].
+ #[must_use]
+ pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self {
+ self.target_mode = target_mode;
+ self
+ }
+
+ /// Sets the `extensions`.
+ ///
+ /// See [`RenameOptions::extensions`].
+ #[must_use]
+ pub fn with_extensions(mut self, extensions: Extensions) -> Self {
+ self.extensions = extensions;
+ self
+ }
+}
+
+impl PartialEq<Self> for RenameOptions {
+ fn eq(&self, other: &Self) -> bool {
+ let Self {
+ target_mode,
+ extensions: _,
+ } = self;
+ let Self {
+ target_mode: target_mode_other,
+ extensions: _,
+ } = other;
+
+ target_mode == target_mode_other
+ }
+}
+
+impl Eq for RenameOptions {}
+
/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
diff --git a/src/limit.rs b/src/limit.rs
index 7e1e2b4..7fddd63 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -19,8 +19,8 @@
use crate::{
BoxStream, CopyOptions, GetOptions, GetResult, GetResultPayload,
ListResult, MultipartUpload,
- ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions,
PutPayload, PutResult, Result,
- StreamExt, UploadPart,
+ ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions,
PutPayload, PutResult,
+ RenameOptions, Result, StreamExt, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -156,14 +156,9 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.copy_opts(from, to, options).await
}
- async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+ async fn rename_opts(&self, from: &Path, to: &Path, options:
RenameOptions) -> Result<()> {
let _permit = self.semaphore.acquire().await.unwrap();
- self.inner.rename(from, to).await
- }
-
- async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
- let _permit = self.semaphore.acquire().await.unwrap();
- self.inner.rename_if_not_exists(from, to).await
+ self.inner.rename_opts(from, to, options).await
}
}
diff --git a/src/local.rs b/src/local.rs
index ebe9527..5b46a7d 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -40,7 +40,7 @@ use crate::{
path::{Path, absolute_path_to_url},
util::InvalidGetRange,
};
-use crate::{CopyMode, CopyOptions};
+use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, thiserror::Error)]
@@ -610,24 +610,52 @@ impl ObjectStore for LocalFileSystem {
}
}
- async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
- let from = self.path_to_filesystem(from)?;
- let to = self.path_to_filesystem(to)?;
- maybe_spawn_blocking(move || {
- loop {
- match std::fs::rename(&from, &to) {
- Ok(_) => return Ok(()),
- Err(source) => match source.kind() {
- ErrorKind::NotFound => match from.exists() {
- true => create_parent_dirs(&to, source)?,
- false => return Err(Error::NotFound { path: from,
source }.into()),
- },
- _ => return Err(Error::UnableToCopyFile { from, to,
source }.into()),
+ async fn rename_opts(&self, from: &Path, to: &Path, options:
RenameOptions) -> Result<()> {
+ let RenameOptions {
+ target_mode,
+ extensions,
+ } = options;
+
+ match target_mode {
+ // optimized implementation
+ RenameTargetMode::Overwrite => {
+ let from = self.path_to_filesystem(from)?;
+ let to = self.path_to_filesystem(to)?;
+ maybe_spawn_blocking(move || {
+ loop {
+ match std::fs::rename(&from, &to) {
+ Ok(_) => return Ok(()),
+ Err(source) => match source.kind() {
+ ErrorKind::NotFound => match from.exists() {
+ true => create_parent_dirs(&to, source)?,
+ false => {
+ return Err(Error::NotFound { path:
from, source }.into());
+ }
+ },
+ _ => {
+ return Err(Error::UnableToCopyFile { from,
to, source }.into());
+ }
+ },
+ }
+ }
+ })
+ .await
+ }
+ // fall-back to copy & delete
+ RenameTargetMode::Create => {
+ self.copy_opts(
+ from,
+ to,
+ CopyOptions {
+ mode: CopyMode::Create,
+ extensions,
},
- }
+ )
+ .await?;
+ self.delete(from).await?;
+ Ok(())
}
- })
- .await
+ }
}
}
diff --git a/src/prefix.rs b/src/prefix.rs
index c37f55d..52173dd 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -23,7 +23,7 @@ use std::ops::Range;
use crate::path::Path;
use crate::{
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
- PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
+ PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions,
Result,
};
/// Store wrapper that applies a constant prefix to all paths handled by the
store.
@@ -188,16 +188,10 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.copy_opts(&full_from, &full_to, options).await
}
- async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+ async fn rename_opts(&self, from: &Path, to: &Path, options:
RenameOptions) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
- self.inner.rename(&full_from, &full_to).await
- }
-
- async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
- let full_from = self.full_path(from);
- let full_to = self.full_path(to);
- self.inner.rename_if_not_exists(&full_from, &full_to).await
+ self.inner.rename_opts(&full_from, &full_to, options).await
}
}
diff --git a/src/throttle.rs b/src/throttle.rs
index bd5795e..3820608 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -21,7 +21,7 @@ use std::ops::Range;
use std::{convert::TryInto, sync::Arc};
use crate::multipart::{MultipartStore, PartId};
-use crate::{CopyOptions, GetOptions, UploadPart};
+use crate::{CopyOptions, GetOptions, RenameOptions, UploadPart};
use crate::{
GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
@@ -261,16 +261,10 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.copy_opts(from, to, options).await
}
- async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+ async fn rename_opts(&self, from: &Path, to: &Path, options:
RenameOptions) -> Result<()> {
sleep(self.config().wait_put_per_call).await;
- self.inner.rename(from, to).await
- }
-
- async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
- sleep(self.config().wait_put_per_call).await;
-
- self.inner.rename_if_not_exists(from, to).await
+ self.inner.rename_opts(from, to, options).await
}
}