bikeshedder opened a new issue, #1084:
URL: https://github.com/apache/iceberg-python/issues/1084

   ### Apache Iceberg version
   
   None
   
   ### Please describe the bug 🐞
   
   ## Summary
   
   I'm currently trying to migrate a couple of dataframes with a custom 
hive-like storage scheme to Iceberg. After a lot of fiddling I managed to load 
the dataframes from an Azure storage, create the table in the Iceberg catalog 
(currently using sqlite + local fs) and append fragments from the Parquet 
dataset. As soon as adding a thread pool I always run into concurrency issues.
   
   ## Errors
   
   I get either of the following two error messages:
   
   ```
   CommitFailedException: Requirement failed: branch main has changed: expected 
id 7548527194257629329, found 
   8136001929437813453
   ```
   or
   ```
   CommitFailedException: Requirement failed: branch main was created 
concurrently
   ```
   
   ## Sources
   
   I use `Dataset.get_fragments` and insert the data into an iceberg table with 
identical partitioning.
   
   I can work around this error by using a GIL (global iceberg lock, pun 
intended.) which is just a `threading.Lock()` that ensures every `load_table()` 
+ `table.append` happens atomically. But that kills almost all performance 
gains there could be made. Also I plan on using this in some Celery runners . 
So using a `threading.Lock()` is no option in the future anyways.
   
   <details> 
     <summary>azure_import.py</summary>
   
   ```python
   #!/bin/env -S poetry run python
   
   from concurrent.futures import ThreadPoolExecutor, as_completed
   
   import pyarrow as pa
   import pyarrow.dataset as pd
   from adlfs import AzureBlobFileSystem
   from azure.identity import DefaultAzureCredential
   from azure.storage.blob import BlobServiceClient
   from pyarrow.dataset import HivePartitioning
   from pyiceberg.catalog import Catalog
   from pyiceberg.catalog.sql import SqlCatalog
   from pyiceberg.io.pyarrow import pyarrow_to_schema
   from pyiceberg.partitioning import PartitionField, PartitionSpec
   from pyiceberg.table.name_mapping import MappedField, NameMapping
   from pyiceberg.transforms import IdentityTransform
   
   import settings
   
   
   class AzureStorage:
       def __init__(self):
           credential = DefaultAzureCredential()
           blob_service_client = BlobServiceClient(
               settings.AZURE_BLOB_URL, credential
           )
           self.container_client = blob_service_client.get_container_client(
               settings.AZURE_BLOB_CONTAINER
           )
           # The AzureBlobFileSystem doesn't cleanly shutdown and currently
           # always raises an expection at the end of this program. See:
           # https://github.com/fsspec/adlfs/issues/431
           self.abfs = AzureBlobFileSystem(
               account_name=settings.AZURE_BLOB_ACCOUNT_NAME,
               credential=credential,
           )
   
       def list_tables(self):
           return self.container_client.walk_blobs(
               settings.AZURE_LIVE_PATH, delimiter="/"
           )
   
       def load_dataset(self, table_name) -> pd.Dataset:
           name = "/".join((settings.AZURE_LIVE_PATH.rstrip("/"), table_name))
           dataset = pd.dataset(
               "/".join([settings.AZURE_LIVE_CONTAINER, name]),
               format="parquet",
               filesystem=self.abfs,
               partitioning=HivePartitioning(
                   pa.schema(
                       [
                           ("dataset", pa.string()),
                           ("flavor", pa.string()),
                       ]
                   )
               ),
           )
           return dataset
   
   
   def create_iceberg_catalog():
       catalog = SqlCatalog(
           "default",
           **{
               "uri": settings.ICEBERG_DATABASE_URI,
               "warehouse": settings.ICEBERG_WAREHOUSE,
           },
       )
       return catalog
   
   
   def download_table(catalog: Catalog, table_name: str, ds: pd.Dataset):
       name_mapping = NameMapping(
           root=[
               MappedField(field_id=field_id, names=[field.name])
               for field_id, field in enumerate(ds.schema, 1)
           ]
       )
       schema = pyarrow_to_schema(ds.schema, name_mapping=name_mapping)
       assert isinstance(ds.partitioning, HivePartitioning), ds.partitioning
       partitioning_spec = PartitionSpec(
           *(
               PartitionField(
                   source_id=name_mapping.find(field.name).field_id,
                   field_id=-1,
                   transform=IdentityTransform(),
                   name=field.name,
               )
               for field in ds.partitioning.schema
           )
       )
       table = catalog.create_table(
           f"{settings.ICEBERG_NAMESPACE}.{table_name}",
           schema=schema,
           partition_spec=partitioning_spec,
       )
       fragments = list(ds.get_fragments())
       with ThreadPoolExecutor(8) as executor:
           futures = [
               executor.submit(
                   download_fragment,
                   table.identifier,
                   fragment,
               )
               for fragment in fragments
           ]
           for future in as_completed(futures):
               try:
                   future.result()
               except Exception as e:
                   executor.shutdown(wait=False, cancel_futures=True)
                   raise e from None
   
   
   def download_fragment(
       table_identifier: str,
       fragment,
   ):
       catalog = create_iceberg_catalog()
       partition_keys = pd.get_partition_keys(fragment.partition_expression)
       fragment_table = fragment.to_table()
       for k, v in partition_keys.items():
           fragment_table = fragment_table.append_column(
               pa.field(k, pa.string(), nullable=False),
               pa.repeat(pa.scalar(v), fragment_table.num_rows),
           )
       table = catalog.load_table(table_identifier)
       table.append(fragment_table)
   
   
   def import_data(storage: AzureStorage, catalog, table_name):
       dataset = storage.load_dataset(table_name)
       download_table(catalog, table_name, dataset)
   
   
   def main():
       catalog = create_iceberg_catalog()
       catalog.create_namespace_if_not_exists(settings.ICEBERG_NAMESPACE)
       storage = AzureStorage()
       for table_name in storage.list_tables():
           import_data(storage, catalog, table_name)
   
   
   if __name__ == "__main__":
       main()
   
   ```
   
   </details>
   
   <details> 
     <summary>pyproject.toml</summary>
   
   ```toml
   [tool.poetry]
   name = "iceberg-azure-importer"
   version = "0.1.0"
   description = ""
   authors = ["Michael P. Jung <michael.j...@terreon.de>"]
   package-mode = false
   
   [tool.poetry.dependencies]
   python = "^3.12"
   pyiceberg = { extras = ["sql-postgres"], version = "^0.7.1" }
   azure-identity = "^1.17.1"
   adlfs = "^2024"
   psutil = "^6.0.0"
   pyarrow = "^17.0.0"
   fsspec = "^2024"
   ```
   
   </detail>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to