This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new e00ff79   refactor!: `rename` & `rename_if_not_exists` => 
`rename_opts` (#555)
e00ff79 is described below

commit e00ff79c03a6b9e017c7b6d905d1bab6a15b3923
Author: Marco Neumann <[email protected]>
AuthorDate: Thu Dec 4 13:42:38 2025 +0100

     refactor!: `rename` & `rename_if_not_exists` => `rename_opts` (#555)
    
    Change the `ObjectStore` core trait to have a single, extensible rename
    operation. This helps #385 and #297.
    
    Also adds extensions similar to
    apache/arrow-rs#7170
    and
    apache/arrow-rs#7213 .
    
    Also see #548 -- which did something similar for `copy`.
---
 src/chunked.rs  |  10 ++---
 src/lib.rs      | 135 ++++++++++++++++++++++++++++++++++++++++++++++++--------
 src/limit.rs    |  13 ++----
 src/local.rs    |  62 +++++++++++++++++++-------
 src/prefix.rs   |  12 ++---
 src/throttle.rs |  12 ++---
 6 files changed, 174 insertions(+), 70 deletions(-)

diff --git a/src/chunked.rs b/src/chunked.rs
index 53dbf1e..49632ed 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -29,7 +29,7 @@ use futures::stream::BoxStream;
 use crate::path::Path;
 use crate::{
     CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload, ObjectMeta,
-    ObjectStore, PutMultipartOptions, PutOptions, PutResult,
+    ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions,
 };
 use crate::{PutPayload, Result};
 
@@ -170,12 +170,8 @@ impl ObjectStore for ChunkedStore {
         self.inner.copy_opts(from, to, options).await
     }
 
-    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
-        self.inner.rename(from, to).await
-    }
-
-    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
-        self.inner.rename_if_not_exists(from, to).await
+    async fn rename_opts(&self, from: &Path, to: &Path, options: 
RenameOptions) -> Result<()> {
+        self.inner.rename_opts(from, to, options).await
     }
 }
 
diff --git a/src/lib.rs b/src/lib.rs
index 736f173..b49f910 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1035,19 +1035,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync 
+ Debug + 'static {
     ///
     /// By default, this is implemented as a copy and then delete source. It 
may not
     /// check when deleting source that it was the same object that was 
originally copied.
-    ///
-    /// If there exists an object at the destination, it will be overwritten.
-    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
-        self.copy(from, to).await?;
-        self.delete(from).await
-    }
-
-    /// Move an object from one path to another in the same object store.
-    ///
-    /// Will return an error if the destination already has an object.
-    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
-        self.copy_if_not_exists(from, to).await?;
-        self.delete(from).await
+    async fn rename_opts(&self, from: &Path, to: &Path, options: 
RenameOptions) -> Result<()> {
+        let RenameOptions {
+            target_mode,
+            extensions,
+        } = options;
+        let copy_mode = match target_mode {
+            RenameTargetMode::Overwrite => CopyMode::Overwrite,
+            RenameTargetMode::Create => CopyMode::Create,
+        };
+        let copy_options = CopyOptions {
+            mode: copy_mode,
+            extensions,
+        };
+        self.copy_opts(from, to, copy_options).await?;
+        self.delete(from).await?;
+        Ok(())
     }
 }
 
@@ -1116,12 +1119,13 @@ macro_rules! as_ref_impl {
                 self.as_ref().copy_opts(from, to, options).await
             }
 
-            async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
-                self.as_ref().rename(from, to).await
-            }
-
-            async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> 
Result<()> {
-                self.as_ref().rename_if_not_exists(from, to).await
+            async fn rename_opts(
+                &self,
+                from: &Path,
+                to: &Path,
+                options: RenameOptions,
+            ) -> Result<()> {
+                self.as_ref().rename_opts(from, to, options).await
             }
         }
     };
@@ -1238,6 +1242,19 @@ pub trait ObjectStoreExt: ObjectStore {
     /// If atomic operations are not supported by the underlying object 
storage (like S3)
     /// it will return an error.
     fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output 
= Result<()>>;
+
+    /// Move an object from one path to another in the same object store.
+    ///
+    /// By default, this is implemented as a copy and then delete source. It 
may not
+    /// check when deleting source that it was the same object that was 
originally copied.
+    ///
+    /// If there exists an object at the destination, it will be overwritten.
+    fn rename(&self, from: &Path, to: &Path) -> impl Future<Output = 
Result<()>>;
+
+    /// Move an object from one path to another in the same object store.
+    ///
+    /// Will return an error if the destination already has an object.
+    fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl 
Future<Output = Result<()>>;
 }
 
 impl<T> ObjectStoreExt for T
@@ -1277,6 +1294,16 @@ where
         let options = CopyOptions::new().with_mode(CopyMode::Create);
         self.copy_opts(from, to, options).await
     }
+
+    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+        let options = 
RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite);
+        self.rename_opts(from, to, options).await
+    }
+
+    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
+        let options = 
RenameOptions::new().with_target_mode(RenameTargetMode::Create);
+        self.rename_opts(from, to, options).await
+    }
 }
 
 /// Result of a list call that includes objects, prefixes (directories) and a
@@ -1828,6 +1855,76 @@ impl PartialEq<Self> for CopyOptions {
 
 impl Eq for CopyOptions {}
 
+/// Configure preconditions for the target of rename operation.
+///
+/// Note though that the source location may or not be deleted at the same 
time in an atomic operation. There is
+/// currently NO flag to control the atomicity of "delete source at the same 
time as creating the target".
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum RenameTargetMode {
+    /// Perform a write operation on the target, overwriting any object 
present at the provided path.
+    #[default]
+    Overwrite,
+    /// Perform an atomic write operation of the target, returning 
[`Error::AlreadyExists`] if an
+    /// object already exists at the provided path.
+    Create,
+}
+
+/// Options for a rename request
+#[derive(Debug, Clone, Default)]
+pub struct RenameOptions {
+    /// Configure the [`RenameTargetMode`] for this operation
+    pub target_mode: RenameTargetMode,
+    /// Implementation-specific extensions. Intended for use by 
[`ObjectStore`] implementations
+    /// that need to pass context-specific information (like tracing spans) 
via trait methods.
+    ///
+    /// These extensions are ignored entirely by backends offered through this 
crate.
+    ///
+    /// They are also excluded from [`PartialEq`] and [`Eq`].
+    pub extensions: Extensions,
+}
+
+impl RenameOptions {
+    /// Create a new [`RenameOptions`]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Sets the `target_mode=.
+    ///
+    /// See [`RenameOptions::target_mode`].
+    #[must_use]
+    pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self {
+        self.target_mode = target_mode;
+        self
+    }
+
+    /// Sets the `extensions`.
+    ///
+    /// See [`RenameOptions::extensions`].
+    #[must_use]
+    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
+        self.extensions = extensions;
+        self
+    }
+}
+
+impl PartialEq<Self> for RenameOptions {
+    fn eq(&self, other: &Self) -> bool {
+        let Self {
+            target_mode,
+            extensions: _,
+        } = self;
+        let Self {
+            target_mode: target_mode_other,
+            extensions: _,
+        } = other;
+
+        target_mode == target_mode_other
+    }
+}
+
+impl Eq for RenameOptions {}
+
 /// A specialized `Result` for object store-related errors
 pub type Result<T, E = Error> = std::result::Result<T, E>;
 
diff --git a/src/limit.rs b/src/limit.rs
index 7e1e2b4..7fddd63 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -19,8 +19,8 @@
 
 use crate::{
     BoxStream, CopyOptions, GetOptions, GetResult, GetResultPayload, 
ListResult, MultipartUpload,
-    ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, 
PutPayload, PutResult, Result,
-    StreamExt, UploadPart,
+    ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, 
PutPayload, PutResult,
+    RenameOptions, Result, StreamExt, UploadPart,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -156,14 +156,9 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         self.inner.copy_opts(from, to, options).await
     }
 
-    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+    async fn rename_opts(&self, from: &Path, to: &Path, options: 
RenameOptions) -> Result<()> {
         let _permit = self.semaphore.acquire().await.unwrap();
-        self.inner.rename(from, to).await
-    }
-
-    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
-        let _permit = self.semaphore.acquire().await.unwrap();
-        self.inner.rename_if_not_exists(from, to).await
+        self.inner.rename_opts(from, to, options).await
     }
 }
 
diff --git a/src/local.rs b/src/local.rs
index ebe9527..5b46a7d 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -40,7 +40,7 @@ use crate::{
     path::{Path, absolute_path_to_url},
     util::InvalidGetRange,
 };
-use crate::{CopyMode, CopyOptions};
+use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
 
 /// A specialized `Error` for filesystem object store-related errors
 #[derive(Debug, thiserror::Error)]
@@ -610,24 +610,52 @@ impl ObjectStore for LocalFileSystem {
         }
     }
 
-    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
-        let from = self.path_to_filesystem(from)?;
-        let to = self.path_to_filesystem(to)?;
-        maybe_spawn_blocking(move || {
-            loop {
-                match std::fs::rename(&from, &to) {
-                    Ok(_) => return Ok(()),
-                    Err(source) => match source.kind() {
-                        ErrorKind::NotFound => match from.exists() {
-                            true => create_parent_dirs(&to, source)?,
-                            false => return Err(Error::NotFound { path: from, 
source }.into()),
-                        },
-                        _ => return Err(Error::UnableToCopyFile { from, to, 
source }.into()),
+    async fn rename_opts(&self, from: &Path, to: &Path, options: 
RenameOptions) -> Result<()> {
+        let RenameOptions {
+            target_mode,
+            extensions,
+        } = options;
+
+        match target_mode {
+            // optimized implementation
+            RenameTargetMode::Overwrite => {
+                let from = self.path_to_filesystem(from)?;
+                let to = self.path_to_filesystem(to)?;
+                maybe_spawn_blocking(move || {
+                    loop {
+                        match std::fs::rename(&from, &to) {
+                            Ok(_) => return Ok(()),
+                            Err(source) => match source.kind() {
+                                ErrorKind::NotFound => match from.exists() {
+                                    true => create_parent_dirs(&to, source)?,
+                                    false => {
+                                        return Err(Error::NotFound { path: 
from, source }.into());
+                                    }
+                                },
+                                _ => {
+                                    return Err(Error::UnableToCopyFile { from, 
to, source }.into());
+                                }
+                            },
+                        }
+                    }
+                })
+                .await
+            }
+            // fall-back to copy & delete
+            RenameTargetMode::Create => {
+                self.copy_opts(
+                    from,
+                    to,
+                    CopyOptions {
+                        mode: CopyMode::Create,
+                        extensions,
                     },
-                }
+                )
+                .await?;
+                self.delete(from).await?;
+                Ok(())
             }
-        })
-        .await
+        }
     }
 }
 
diff --git a/src/prefix.rs b/src/prefix.rs
index c37f55d..52173dd 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -23,7 +23,7 @@ use std::ops::Range;
 use crate::path::Path;
 use crate::{
     CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
-    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
+    PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, 
Result,
 };
 
 /// Store wrapper that applies a constant prefix to all paths handled by the 
store.
@@ -188,16 +188,10 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.copy_opts(&full_from, &full_to, options).await
     }
 
-    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+    async fn rename_opts(&self, from: &Path, to: &Path, options: 
RenameOptions) -> Result<()> {
         let full_from = self.full_path(from);
         let full_to = self.full_path(to);
-        self.inner.rename(&full_from, &full_to).await
-    }
-
-    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
-        let full_from = self.full_path(from);
-        let full_to = self.full_path(to);
-        self.inner.rename_if_not_exists(&full_from, &full_to).await
+        self.inner.rename_opts(&full_from, &full_to, options).await
     }
 }
 
diff --git a/src/throttle.rs b/src/throttle.rs
index bd5795e..3820608 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -21,7 +21,7 @@ use std::ops::Range;
 use std::{convert::TryInto, sync::Arc};
 
 use crate::multipart::{MultipartStore, PartId};
-use crate::{CopyOptions, GetOptions, UploadPart};
+use crate::{CopyOptions, GetOptions, RenameOptions, UploadPart};
 use crate::{
     GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, 
ObjectMeta, ObjectStore,
     PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
@@ -261,16 +261,10 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         self.inner.copy_opts(from, to, options).await
     }
 
-    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+    async fn rename_opts(&self, from: &Path, to: &Path, options: 
RenameOptions) -> Result<()> {
         sleep(self.config().wait_put_per_call).await;
 
-        self.inner.rename(from, to).await
-    }
-
-    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
-        sleep(self.config().wait_put_per_call).await;
-
-        self.inner.rename_if_not_exists(from, to).await
+        self.inner.rename_opts(from, to, options).await
     }
 }
 

Reply via email to