This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 01a1c51 feat: impl MultipartStore for PrefixStore (#587)
01a1c51 is described below
commit 01a1c518372d78f56d3768fa5cc46e108b1b1758
Author: Xin Sun <[email protected]>
AuthorDate: Sat Jan 10 02:35:50 2026 +0800
feat: impl MultipartStore for PrefixStore (#587)
Change-Id: I2d8ae7970cc77cfde3d1085dabafbb30332b2779
---
src/prefix.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 50 insertions(+), 5 deletions(-)
diff --git a/src/prefix.rs b/src/prefix.rs
index cecf03f..67456fd 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -20,26 +20,27 @@ use bytes::Bytes;
use futures::{StreamExt, TryStreamExt, stream::BoxStream};
use std::ops::Range;
+use crate::multipart::{MultipartStore, PartId};
use crate::path::Path;
use crate::{
- CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
- PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions,
Result,
+ CopyOptions, GetOptions, GetResult, ListResult, MultipartId,
MultipartUpload, ObjectMeta,
+ ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
RenameOptions, Result,
};
/// Store wrapper that applies a constant prefix to all paths handled by the
store.
#[derive(Debug, Clone)]
-pub struct PrefixStore<T: ObjectStore> {
+pub struct PrefixStore<T> {
prefix: Path,
inner: T,
}
-impl<T: ObjectStore> std::fmt::Display for PrefixStore<T> {
+impl<T> std::fmt::Display for PrefixStore<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
}
}
-impl<T: ObjectStore> PrefixStore<T> {
+impl<T> PrefixStore<T> {
/// Create a new instance of [`PrefixStore`]
pub fn new(store: T, prefix: impl Into<Path>) -> Self {
Self {
@@ -190,6 +191,40 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
}
}
+#[async_trait::async_trait]
+impl<T: MultipartStore> MultipartStore for PrefixStore<T> {
+ async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
+ let full_path = self.full_path(path);
+ self.inner.create_multipart(&full_path).await
+ }
+
+ async fn put_part(
+ &self,
+ path: &Path,
+ id: &MultipartId,
+ part_idx: usize,
+ data: PutPayload,
+ ) -> Result<PartId> {
+ let full_path = self.full_path(path);
+ self.inner.put_part(&full_path, id, part_idx, data).await
+ }
+
+ async fn complete_multipart(
+ &self,
+ path: &Path,
+ id: &MultipartId,
+ parts: Vec<PartId>,
+ ) -> Result<PutResult> {
+ let full_path = self.full_path(path);
+ self.inner.complete_multipart(&full_path, id, parts).await
+ }
+
+ async fn abort_multipart(&self, path: &Path, id: &MultipartId) ->
Result<()> {
+ let full_path = self.full_path(path);
+ self.inner.abort_multipart(&full_path, id).await
+ }
+}
+
#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod tests {
@@ -197,6 +232,7 @@ mod tests {
use super::*;
use crate::local::LocalFileSystem;
+ use crate::memory::InMemory;
use crate::{ObjectStoreExt, integration::*};
use tempfile::TempDir;
@@ -262,4 +298,13 @@ mod tests {
let read_data =
local.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(&*read_data, data)
}
+
+ #[tokio::test]
+ async fn prefix_multipart() {
+ let store = PrefixStore::new(InMemory::new(), "prefix");
+
+ multipart(&store, &store).await;
+ multipart_out_of_order(&store).await;
+ multipart_race_condition(&store, true).await;
+ }
}