This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 92b1378 refactor!: move `delete` to `ObjectStoreExt` (#549)
92b1378 is described below
commit 92b13782bda33805c6c0cc0e8d95656f8aa61cd0
Author: Marco Neumann <[email protected]>
AuthorDate: Fri Dec 12 15:05:01 2025 +0100
refactor!: move `delete` to `ObjectStoreExt` (#549)
* refactor!: move `delete` to `ObjectStoreExt`
The normal delete is really just a bulk delete with a single entry.
Part of #385.
* refactor: improve error messages
Co-authored-by: Andrew Lamb <[email protected]>
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
src/aws/client.rs | 7 ++++++-
src/aws/mod.rs | 5 -----
src/azure/client.rs | 42 +++---------------------------------------
src/azure/credential.rs | 1 -
src/azure/mod.rs | 5 +----
src/chunked.rs | 4 ----
src/gcp/mod.rs | 4 ----
src/http/mod.rs | 4 ----
src/lib.rs | 32 +++++++++++++++++++++-----------
src/limit.rs | 14 ++++++++------
src/local.rs | 10 +---------
src/memory.rs | 5 -----
src/prefix.rs | 5 -----
src/throttle.rs | 11 ++++-------
tests/get_range_file.rs | 4 ----
15 files changed, 44 insertions(+), 109 deletions(-)
diff --git a/src/aws/client.rs b/src/aws/client.rs
index 150a47c..bd9618e 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -69,6 +69,7 @@ pub(crate) enum Error {
#[error("Error performing DeleteObjects request: {}", source)]
DeleteObjectsRequest {
source: crate::client::retry::RetryError,
+ paths: Vec<String>,
},
#[error(
@@ -127,6 +128,7 @@ impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
match err {
Error::CompleteMultipartRequest { source, path } =>
source.error(STORE, path),
+ Error::DeleteObjectsRequest { source, paths } =>
source.error(STORE, paths.join(",")),
_ => Self::Generic {
store: STORE,
source: Box::new(err),
@@ -551,7 +553,10 @@ impl S3Client {
.with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
.send_retry(&self.config.retry_config)
.await
- .map_err(|source| Error::DeleteObjectsRequest { source })?
+ .map_err(|source| Error::DeleteObjectsRequest {
+ source,
+ paths: paths.iter().map(|p| p.to_string()).collect(),
+ })?
.into_body()
.bytes()
.await
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index cb66f9d..dd2cf6f 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -250,11 +250,6 @@ impl ObjectStore for AmazonS3 {
self.client.get_opts(location, options).await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- self.client.request(Method::DELETE, location).send().await?;
- Ok(())
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
diff --git a/src/azure/client.rs b/src/azure/client.rs
index bce2c24..54ab307 100644
--- a/src/azure/client.rs
+++ b/src/azure/client.rs
@@ -74,12 +74,6 @@ pub(crate) enum Error {
path: String,
},
- #[error("Error performing delete request {}: {}", path, source)]
- DeleteRequest {
- source: crate::client::retry::RetryError,
- path: String,
- },
-
#[error("Error performing bulk delete request: {}", source)]
BulkDeleteRequest {
source: crate::client::retry::RetryError,
@@ -150,9 +144,9 @@ pub(crate) enum Error {
impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
match err {
- Error::GetRequest { source, path }
- | Error::DeleteRequest { source, path }
- | Error::PutRequest { source, path } => source.error(STORE, path),
+ Error::GetRequest { source, path } | Error::PutRequest { source,
path } => {
+ source.error(STORE, path)
+ }
_ => Self::Generic {
store: STORE,
source: Box::new(err),
@@ -627,36 +621,6 @@ impl AzureClient {
.map_err(|source| Error::Metadata { source })?)
}
- /// Make an Azure Delete request
<https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
- pub(crate) async fn delete_request<T: Serialize + ?Sized + Sync>(
- &self,
- path: &Path,
- query: &T,
- ) -> Result<()> {
- let credential = self.get_credential().await?;
- let url = self.config.path_url(path);
-
- let sensitive = credential
- .as_deref()
- .map(|c| c.sensitive_request())
- .unwrap_or_default();
- self.client
- .delete(url.as_str())
- .query(query)
- .header(&DELETE_SNAPSHOTS, "include")
- .with_azure_authorization(&credential, &self.config.account)
- .retryable(&self.config.retry_config)
- .sensitive(sensitive)
- .send()
- .await
- .map_err(|source| {
- let path = path.as_ref().into();
- Error::DeleteRequest { source, path }
- })?;
-
- Ok(())
- }
-
fn build_bulk_delete_body(
&self,
boundary: &str,
diff --git a/src/azure/credential.rs b/src/azure/credential.rs
index ae630a6..dcc6cdd 100644
--- a/src/azure/credential.rs
+++ b/src/azure/credential.rs
@@ -47,7 +47,6 @@ use url::Url;
static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03");
static VERSION: HeaderName = HeaderName::from_static("x-ms-version");
pub(crate) static BLOB_TYPE: HeaderName =
HeaderName::from_static("x-ms-blob-type");
-pub(crate) static DELETE_SNAPSHOTS: HeaderName =
HeaderName::from_static("x-ms-delete-snapshots");
pub(crate) static COPY_SOURCE: HeaderName =
HeaderName::from_static("x-ms-copy-source");
static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
static PARTNER_TOKEN: HeaderName =
HeaderName::from_static("x-ms-partner-token");
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index d22ffcf..04c8f31 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -117,13 +117,10 @@ impl ObjectStore for MicrosoftAzure {
self.client.get_opts(location, options).await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- self.client.delete_request(location, &()).await
- }
-
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
self.client.list(prefix)
}
+
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
diff --git a/src/chunked.rs b/src/chunked.rs
index 49632ed..b362366 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -139,10 +139,6 @@ impl ObjectStore for ChunkedStore {
self.inner.get_ranges(location, ranges).await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- self.inner.delete(location).await
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 270b89a..2fb74b4 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -181,10 +181,6 @@ impl ObjectStore for GoogleCloudStorage {
self.client.get_opts(location, options).await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- self.client.delete_request(location).await
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
diff --git a/src/http/mod.rs b/src/http/mod.rs
index 673419c..e241002 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -132,10 +132,6 @@ impl ObjectStore for HttpStore {
self.client.get_opts(location, options).await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- self.client.delete(location).await
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
diff --git a/src/lib.rs b/src/lib.rs
index b0cf542..6ffdf6d 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -853,9 +853,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
.await
}
- /// Delete the object at the specified location.
- async fn delete(&self, location: &Path) -> Result<()>;
-
/// Delete all the objects at the specified locations
///
/// When supported, this method will use bulk operations that delete more
@@ -958,10 +955,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// # todo!()
/// # }
/// #
- /// # async fn delete(&self, _: &Path) -> Result<()> {
- /// # todo!()
- /// # }
- /// #
/// fn delete_stream(
/// &self,
/// locations: BoxStream<'static, Result<Path>>,
@@ -1107,10 +1100,6 @@ macro_rules! as_ref_impl {
self.as_ref().get_ranges(location, ranges).await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- self.as_ref().delete(location).await
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
@@ -1250,6 +1239,9 @@ pub trait ObjectStoreExt: ObjectStore {
/// Return the metadata for the specified location
fn head(&self, location: &Path) -> impl Future<Output =
Result<ObjectMeta>>;
+ /// Delete the object at the specified location.
+ fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;
+
/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
@@ -1306,6 +1298,24 @@ where
Ok(self.get_opts(location, options).await?.meta)
}
+ async fn delete(&self, location: &Path) -> Result<()> {
+ let location = location.clone();
+ let mut stream =
+ self.delete_stream(futures::stream::once(async move { Ok(location)
}).boxed());
+ let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
+ store: "ext",
+ source: "`delete_stream` with one location should yield once but
didn't".into(),
+ })?;
+ if stream.next().await.is_some() {
+ Err(Error::Generic {
+ store: "ext",
+ source: "`delete_stream` with one location expected to yield
exactly once, but yielded more than once".into(),
+ })
+ } else {
+ Ok(())
+ }
+ }
+
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
self.copy_opts(from, to, options).await
diff --git a/src/limit.rs b/src/limit.rs
index 7fddd63..30fe2b6 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -105,16 +105,18 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.get_ranges(location, ranges).await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- let _permit = self.semaphore.acquire().await.unwrap();
- self.inner.delete(location).await
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
- self.inner.delete_stream(locations)
+ let inner = Arc::clone(&self.inner);
+ let fut = Arc::clone(&self.semaphore)
+ .acquire_owned()
+ .map(move |permit| {
+ let s = inner.delete_stream(locations);
+ PermitWrapper::new(s, permit.unwrap())
+ });
+ fut.into_stream().flatten().boxed()
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
diff --git a/src/local.rs b/src/local.rs
index 5b46a7d..003e7d7 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -40,7 +40,7 @@ use crate::{
path::{Path, absolute_path_to_url},
util::InvalidGetRange,
};
-use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
+use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions,
RenameTargetMode};
/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, thiserror::Error)]
@@ -444,14 +444,6 @@ impl ObjectStore for LocalFileSystem {
.await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- let config = Arc::clone(&self.config);
- let automatic_cleanup = self.automatic_cleanup;
- let location = location.clone();
- maybe_spawn_blocking(move || Self::delete_location(config,
automatic_cleanup, &location))
- .await
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
diff --git a/src/memory.rs b/src/memory.rs
index 08e41c2..f026907 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -294,11 +294,6 @@ impl ObjectStore for InMemory {
.collect()
}
- async fn delete(&self, location: &Path) -> Result<()> {
- self.storage.write().map.remove(location);
- Ok(())
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
diff --git a/src/prefix.rs b/src/prefix.rs
index 52173dd..cecf03f 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -124,11 +124,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.get_ranges(&full_path, ranges).await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- let full_path = self.full_path(location);
- self.inner.delete(&full_path).await
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
diff --git a/src/throttle.rs b/src/throttle.rs
index 3820608..1fc90d7 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -34,10 +34,13 @@ use std::time::Duration;
/// Configuration settings for throttled store
#[derive(Debug, Default, Clone, Copy)]
pub struct ThrottleConfig {
- /// Sleep duration for every call to [`delete`](ThrottledStore::delete).
+ /// Sleep duration for every call to [`delete`], or every element in
[`delete_stream`].
///
/// Sleeping is done before the underlying store is called and
independently of the success of
/// the operation.
+ ///
+ /// [`delete`]: crate::ObjectStoreExt::delete
+ /// [`delete_stream`]: ThrottledStore::delete_stream
pub wait_delete_per_call: Duration,
/// Sleep duration for every byte received during
[`get_opts`](ThrottledStore::get_opts).
@@ -193,12 +196,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.get_ranges(location, ranges).await
}
- async fn delete(&self, location: &Path) -> Result<()> {
- sleep(self.config().wait_delete_per_call).await;
-
- self.inner.delete(location).await
- }
-
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs
index df0a414..95027eb 100644
--- a/tests/get_range_file.rs
+++ b/tests/get_range_file.rs
@@ -58,10 +58,6 @@ impl ObjectStore for MyStore {
self.0.get_opts(location, options).await
}
- async fn delete(&self, _: &Path) -> Result<()> {
- todo!()
- }
-
fn delete_stream(
&self,
_: BoxStream<'static, Result<Path>>,