This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 01a1c51  feat: impl MultipartStore for PrefixStore (#587)
01a1c51 is described below

commit 01a1c518372d78f56d3768fa5cc46e108b1b1758
Author: Xin Sun <[email protected]>
AuthorDate: Sat Jan 10 02:35:50 2026 +0800

    feat: impl MultipartStore for PrefixStore (#587)
    
    Change-Id: I2d8ae7970cc77cfde3d1085dabafbb30332b2779
---
 src/prefix.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 50 insertions(+), 5 deletions(-)

diff --git a/src/prefix.rs b/src/prefix.rs
index cecf03f..67456fd 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -20,26 +20,27 @@ use bytes::Bytes;
 use futures::{StreamExt, TryStreamExt, stream::BoxStream};
 use std::ops::Range;
 
+use crate::multipart::{MultipartStore, PartId};
 use crate::path::Path;
 use crate::{
-    CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
-    PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, 
Result,
+    CopyOptions, GetOptions, GetResult, ListResult, MultipartId, 
MultipartUpload, ObjectMeta,
+    ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, 
RenameOptions, Result,
 };
 
 /// Store wrapper that applies a constant prefix to all paths handled by the 
store.
 #[derive(Debug, Clone)]
-pub struct PrefixStore<T: ObjectStore> {
+pub struct PrefixStore<T> {
     prefix: Path,
     inner: T,
 }
 
-impl<T: ObjectStore> std::fmt::Display for PrefixStore<T> {
+impl<T> std::fmt::Display for PrefixStore<T> {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
     }
 }
 
-impl<T: ObjectStore> PrefixStore<T> {
+impl<T> PrefixStore<T> {
     /// Create a new instance of [`PrefixStore`]
     pub fn new(store: T, prefix: impl Into<Path>) -> Self {
         Self {
@@ -190,6 +191,40 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
     }
 }
 
+#[async_trait::async_trait]
+impl<T: MultipartStore> MultipartStore for PrefixStore<T> {
+    async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
+        let full_path = self.full_path(path);
+        self.inner.create_multipart(&full_path).await
+    }
+
+    async fn put_part(
+        &self,
+        path: &Path,
+        id: &MultipartId,
+        part_idx: usize,
+        data: PutPayload,
+    ) -> Result<PartId> {
+        let full_path = self.full_path(path);
+        self.inner.put_part(&full_path, id, part_idx, data).await
+    }
+
+    async fn complete_multipart(
+        &self,
+        path: &Path,
+        id: &MultipartId,
+        parts: Vec<PartId>,
+    ) -> Result<PutResult> {
+        let full_path = self.full_path(path);
+        self.inner.complete_multipart(&full_path, id, parts).await
+    }
+
+    async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> 
Result<()> {
+        let full_path = self.full_path(path);
+        self.inner.abort_multipart(&full_path, id).await
+    }
+}
+
 #[cfg(not(target_arch = "wasm32"))]
 #[cfg(test)]
 mod tests {
@@ -197,6 +232,7 @@ mod tests {
 
     use super::*;
     use crate::local::LocalFileSystem;
+    use crate::memory::InMemory;
     use crate::{ObjectStoreExt, integration::*};
 
     use tempfile::TempDir;
@@ -262,4 +298,13 @@ mod tests {
         let read_data = 
local.get(&location).await.unwrap().bytes().await.unwrap();
         assert_eq!(&*read_data, data)
     }
+
+    #[tokio::test]
+    async fn prefix_multipart() {
+        let store = PrefixStore::new(InMemory::new(), "prefix");
+
+        multipart(&store, &store).await;
+        multipart_out_of_order(&store).await;
+        multipart_race_condition(&store, true).await;
+    }
 }

Reply via email to