chengchengpei commented on issue #1305:
URL: 
https://github.com/apache/iceberg-python/issues/1305#issuecomment-2465678139

   i tried to run the following codes: (multiple processes to `append` to the 
same iceberg table)
   
   ```
   import os
   import time
   from multiprocessing import Pool
   
   import pyarrow as pa
   import pyarrow.parquet as pq
   from pyiceberg.catalog import load_catalog
   from pyiceberg.schema import Schema
   from pyiceberg.types import NestedField, StringType, BinaryType
   import base64
   import boto3
   
   from utils import list_files
   
   
   def process_batch(batch):
       print('start processing batch')
       ids = []
       image_data = []
       for image_path, image_name in batch:
           with open(image_path, "rb") as f:
               image_data.append(base64.b64encode(f.read()))
               ids.append(image_name)
       table_data = pa.Table.from_pydict({"id": ids, "image_data": image_data})
       start = time.time()
       catalog = load_catalog("glue", **{
           "type": "glue",
           "region": "us-east-1",
           "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
           "s3.access-key-id": os.getenv("AWS_KEY_ID"),
           "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"),
           "max-workers": 8
       })
       
catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data)
       end = time.time()
       print('uploaded {} in {} seconds'.format(len(ids), end - start))
       return len(ids), end - start
   
   
   if __name__ == "__main__":
       # Create a schema for the Iceberg table
       schema = Schema(
           NestedField(1, "id", StringType()),
           NestedField(2, "image_data", BinaryType())
       )
   
       # Load the Iceberg catalog
       catalog = load_catalog("glue", **{
           "type": "glue",
           "region": "us-east-1",
           "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
           "s3.access-key-id": os.getenv("AWS_KEY_ID"),
           "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"),
           "max-workers": 8
           # "write.parquet.compression-codec": "snappy"
       })
   
       catalog.create_namespace_if_not_exists("test")
   
       # Create an Iceberg table
       table = catalog.create_table_if_not_exists(
           identifier="test.imagenet-object-localization-challenge-10000",
           schema=schema,
           location="s3://test/iceberg-data/")
   
       # Load images and convert to base64
       images_list = list_files("/Users/ILSVRC/data/CLS-LOC/test/", 
extension=".JPEG")
       batch_size = 10000
       total_batches = 10
       processes = []
       batches = [images_list[i:i + batch_size] for i in
                  range(0, min(len(images_list), total_batches * batch_size), 
batch_size)]
   
       with Pool(4) as pool:
           results = pool.map(process_batch, batches)
   
       for result in results:
           print('uploaded {} in {} seconds'.format(result[0], result[1]))
   
   ```
   
   but got 
   ```
   Traceback (most recent call last):
     File 
"/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py",
 line 125, in worker
       result = (True, func(*args, **kwds))
     File 
"/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py",
 line 48, in mapstar
       return list(map(*args))
     File "/Users/test/write_images_to_iceberg.py", line 51, in process_batch
       
catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data)
     File 
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", 
line 1578, in append
       tx.append(df=df, snapshot_properties=snapshot_properties)
     File 
"/Users/tst/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 
289, in __exit__
       self.commit_transaction()
     File 
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", 
line 712, in commit_transaction
       self._table._do_commit(  # pylint: disable=W0212
     File 
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", 
line 1638, in _do_commit
       response = self.catalog._commit_table(  # pylint: disable=W0212
     File 
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/glue.py", line 
484, in _commit_table
       updated_staged_table = self._update_and_stage_table(current_table, 
table_request)
     File 
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/__init__.py", 
line 835, in _update_and_stage_table
       requirement.validate(current_table.metadata if current_table else None)
     File 
"/Users/tests/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", 
line 1262, in validate
       raise CommitFailedException(
   pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main 
has changed: expected id 2633742078255924117, found 3998254648540280684
   """
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/Users/test/write_images_to_iceberg.py", line 92, in <module>
       results = pool.map(process_batch, batches)
     File 
"/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py",
 line 364, in map
       return self._map_async(func, iterable, mapstar, chunksize).get()
     File 
"/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py",
 line 771, in get
       raise self._value
   pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main 
has changed: expected id 2633742078255924117, found 3998254648540280684
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to