chengchengpei commented on issue #1305: URL: https://github.com/apache/iceberg-python/issues/1305#issuecomment-2465678139
i tried to run the following codes: (multiple processes to `append` to the same iceberg table) ``` import os import time from multiprocessing import Pool import pyarrow as pa import pyarrow.parquet as pq from pyiceberg.catalog import load_catalog from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, BinaryType import base64 import boto3 from utils import list_files def process_batch(batch): print('start processing batch') ids = [] image_data = [] for image_path, image_name in batch: with open(image_path, "rb") as f: image_data.append(base64.b64encode(f.read())) ids.append(image_name) table_data = pa.Table.from_pydict({"id": ids, "image_data": image_data}) start = time.time() catalog = load_catalog("glue", **{ "type": "glue", "region": "us-east-1", "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", "s3.access-key-id": os.getenv("AWS_KEY_ID"), "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"), "max-workers": 8 }) catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data) end = time.time() print('uploaded {} in {} seconds'.format(len(ids), end - start)) return len(ids), end - start if __name__ == "__main__": # Create a schema for the Iceberg table schema = Schema( NestedField(1, "id", StringType()), NestedField(2, "image_data", BinaryType()) ) # Load the Iceberg catalog catalog = load_catalog("glue", **{ "type": "glue", "region": "us-east-1", "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", "s3.access-key-id": os.getenv("AWS_KEY_ID"), "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"), "max-workers": 8 # "write.parquet.compression-codec": "snappy" }) catalog.create_namespace_if_not_exists("test") # Create an Iceberg table table = catalog.create_table_if_not_exists( identifier="test.imagenet-object-localization-challenge-10000", schema=schema, location="s3://test/iceberg-data/") # Load images and convert to base64 images_list = list_files("/Users/ILSVRC/data/CLS-LOC/test/", extension=".JPEG") batch_size = 10000 total_batches = 10 processes = [] batches = [images_list[i:i + batch_size] for i in range(0, min(len(images_list), total_batches * batch_size), batch_size)] with Pool(4) as pool: results = pool.map(process_batch, batches) for result in results: print('uploaded {} in {} seconds'.format(result[0], result[1])) ``` but got ``` Traceback (most recent call last): File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 125, in worker result = (True, func(*args, **kwds)) File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar return list(map(*args)) File "/Users/test/write_images_to_iceberg.py", line 51, in process_batch catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data) File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 1578, in append tx.append(df=df, snapshot_properties=snapshot_properties) File "/Users/tst/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 289, in __exit__ self.commit_transaction() File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 712, in commit_transaction self._table._do_commit( # pylint: disable=W0212 File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 1638, in _do_commit response = self.catalog._commit_table( # pylint: disable=W0212 File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/glue.py", line 484, in _commit_table updated_staged_table = self._update_and_stage_table(current_table, table_request) File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/__init__.py", line 835, in _update_and_stage_table requirement.validate(current_table.metadata if current_table else None) File "/Users/tests/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 1262, in validate raise CommitFailedException( pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main has changed: expected id 2633742078255924117, found 3998254648540280684 """ The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/Users/test/write_images_to_iceberg.py", line 92, in <module> results = pool.map(process_batch, batches) File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 364, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 771, in get raise self._value pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main has changed: expected id 2633742078255924117, found 3998254648540280684 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org