Plenitude-ai opened a new issue, #46517:
URL: https://github.com/apache/arrow/issues/46517

   ### Describe the enhancement requested
   
   **TLDR :**
   Trying to write partitioned dataset with 
`existing_data_behavior="delete_matching"` leads to lots of directory 
read/creation thread collisions. This can be worked-around with 
`pyarrow.fs.S3RetryStrategy(max_attempts=pyarrow.cpu_count())`, which *might* 
be improved by adding a new `retry_delay` parameter or other tenacity-like 
parameters in class 
[S3RetryStrategy](https://github.com/apache/arrow/blob/main/python/pyarrow/_s3fs.pyx#L110).
   
   **Context :**
   I am trying to push a dataset on s3, partitioned on several fields. I need 
to use the parameter `existing_data_behavior="delete_matching"`, to be able to 
run the pipeline with different parameters on the same data, and make sure that 
it actually does overwrite existing data, not appending.
   This combination leads to a lot of Collisions, which I think might come from 
thread writing :
   ```sh
   2025-05-20 14:00:51.527 | ERROR    | 
src.data_utils:_write_parquet_with_retry:93 -
   Write attempt (s3://<REDACTED>/limit=-1/date_end=2025-03-09/days_range=1) 
failed:
   When creating key '<REDACTED>/limit=-1/date_end=2025-03-09/' in bucket 
'<REDACTED>':
   AWS Error UNKNOWN (HTTP status 409) during PutObject operation:
   Unable to parse
   ExceptionName: OperationAborted Message: A conflicting conditional operation 
is currently in progress against this resource. Please try again.
   ```
   And here is my configuration :
   ```python
   import polars as pl
   import pyarrow
   import tenacity
   from loguru import logger
   
   def write_dataset(
       df: pl.DataFrame,
       output_path: str,
       partition_cols: list[str] = [],
       existing_data_behavior: str = "delete_matching",
       *,
       max_retries: int = 5,
       retry_delay: int = 1,
   ):
       PA_FS_CONFIG_ = {
           "access_key":AWS_ACCESS_KEY_ID,
           "secret_key": AWS_SECRET_ACCESS_KEY,
           "endpoint_override": AWS_ENDPOINT_URL,
           "region": AWS_REGION,
           "check_directory_existence_before_creation": True,
           "connect_timeout": 1,
           "request_timeout": 3,
           "retry_strategy": pyarrow.fs.AwsDefaultS3RetryStrategy(
               max_attempts=max(pyarrow.cpu_count(), 10)
           ),
           "default_metadata": {"ACL": "private"},
       }
       pa_fs: pyarrow.fs.FileSystem = (
           pyarrow.fs.S3FileSystem(**PA_FS_CONFIG_)
           if output_path.startswith("s3://")
           else pyarrow.fs.LocalFileSystem()
       )
       logger.info(f"Writing dataset to {output_path}...")
   
       # Helper function to write with retries
       @tenacity.retry(
           stop=tenacity.stop_after_attempt(max_retries),
           wait=tenacity.wait_exponential(
               multiplier=1, min=1, max=retry_delay * 2),
           retry=tenacity.retry_if_exception_type((OSError, IOError)),
           reraise=True,
       )
       def _write_parquet_with_retry():
           try:
               pyarrow.parquet.write_to_dataset(
                   table=df.to_arrow(),
                   root_path=output_path.removeprefix("s3://"),
                   partitioning=partition_cols,
                   partitioning_flavor="hive",
                   existing_data_behavior=existing_data_behavior,
                   filesystem=pa_fs,
                   use_threads=True,
               )
           except Exception as e:
               logger.error(f"Write attempt ({output_path}) failed: {str(e)}")
               raise
   
       _write_parquet_with_retry()
       logger.info(f"Dataset written to {output_path} ({df.height} rows)")
   ```
   
   For the moment, there is only a max_attempts parameter in 
[S3RetryStrategy](https://github.com/apache/arrow/blob/main/python/pyarrow/_s3fs.pyx#L110).
 I would imagine that adding tenacity-like parameters would allow for an easier 
retrying approach.
   
   I am not that expert in pyarrow, there might very well be a better way to 
circumvent this problem than just spamming retries, even with a native 
exponential backoff. But going through the documentation this is all I could 
come up with, hence my proposal of new feature. Please feel free to advise a 
completely different approach !
   
   For further information, I am running this (over)write workflow on OVH 
cloud, that provides OpenIO s3-compatible backend, so this is not native AWS S3.
   
   ### Component(s)
   
   Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to