arul-cc opened a new issue, #2120: URL: https://github.com/apache/iceberg-python/issues/2120
### Apache Iceberg version 0.9.1 (latest release) ### Please describe the bug 🐞 ## Problem Description While performing parallel upsert operations on an Iceberg table, the table metadata appears to have been corrupted, causing the table to become inaccessible (though the physical data files still exist). The issue occurs when multiple concurrent upsert operations are attempted. ## Expected Behavior - Parallel upsert operations should either: - Succeed with proper ACID transaction handling, or - Fail gracefully without corrupting table metadata ## Actual Behavior 1. First upsert operation succeeds 2. Subsequent operations fail with `CommitFailedException` (expected due to ACID constraints) 3. With more parallel calls, the table becomes inaccessible with "table not found" errors 4. Physical data files remain intact, suggesting metadata corruption ## Reproduction Steps 1. Initialize Iceberg table with sample data using the provided `upload_file` code 2. Attempt parallel upsert operations using the `perform_upsert` function with multiple workers 3. Observe the table becomes inaccessible after several attempts ## Code Samples ### Table Initialization Code ```python import os import uuid import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import pyarrow.compute as pc from minio import Minio from minio.error import S3Error from pyiceberg.catalog import load_catalog from pyiceberg.schema import Schema from pyiceberg.types import ( StringType, IntegerType, LongType, FloatType, DoubleType, BooleanType, DateType, TimestampType, TimeType, BinaryType, DecimalType, FixedType, ListType, MapType, StructType, NestedField ) from pyiceberg.expressions import EqualTo from pyiceberg.table import Table from io import BytesIO from typing import List, Dict, Union, Optional from datetime import datetime import logging # logging.basicConfig(level=logging.DEBUG) class IcebergMinIOUtils: def __init__(self): """Initialize MinIO and Iceberg clients.""" # MinIO client self.minio_client = Minio( endpoint="minio:9000", access_key="admin", secret_key="password", secure=False ) self.bucket_name = "warehouse" # Iceberg catalog self.catalog = load_catalog( "default", **{ "uri": "http://localhost:8181", "s3.endpoint": "http://localhost:9000", "s3.access-key-id": "admin", "s3.secret-access-key": "password", "s3.path-style-access": "true", } ) # Size cap from environment variable (in MB) self.max_download_size_mb = float(os.getenv("MAX_DOWNLOAD_SIZE_MB", 100)) # Metadata columns definition self.metadata_columns = [ NestedField(field_id=1000, name="created_at", field_type=TimestampType(), required=False), NestedField(field_id=1001, name="source", field_type=StringType(), required=False) ] def _is_iceberg_format(self, data: Union[str, pd.DataFrame, pa.Table, bytes]) -> bool: """Check if data is in a format suitable for Iceberg (JSON, Parquet, CSV, DataFrame).""" if isinstance(data, pd.DataFrame) or isinstance(data, pa.Table): return True if isinstance(data, str): ext = os.path.splitext(data)[1].lower() return ext in [".json", ".parquet", ".csv"] return False def _infer_schema_from_data(self, data: Union[pd.DataFrame, pa.Table]) -> List[NestedField]: """Infer Iceberg schema fields from Pandas DataFrame or PyArrow Table, including JSON/dict types.""" if isinstance(data, pd.DataFrame): # Convert object columns containing dicts or lists to PyArrow StructType or ListType for col in data.columns: if data[col].dtype == "object": sample = next((x for x in data[col].dropna() if x is not None), None) if isinstance(sample, dict): # Infer struct fields from dict struct_fields = [ pa.field(k, self._infer_pyarrow_type(v)) for k, v in sample.items() ] data[col] = data[col].apply(lambda x: x if isinstance(x, dict) else None) data[col] = pa.array(data[col], pa.struct(struct_fields)) elif isinstance(sample, list): # Infer list element type from first non-empty list element_sample = next((x for l in data[col].dropna() for x in l if l), None) element_type = self._infer_pyarrow_type(element_sample) if element_sample else pa.string() data[col] = data[col].apply(lambda x: x if isinstance(x, list) else None) data[col] = pa.array(data[col], pa.list_(element_type)) data = pa.Table.from_pandas(data, preserve_index=False) fields = [] for i, field in enumerate(data.schema): field_type = self._map_pyarrow_to_iceberg(field.type) required = False if field.name=="id": required = True fields.append(NestedField(field_id=i+1, name=field.name, field_type=field_type, required=required)) return fields def _convert_data_into_iceberg_supported_format(self, data: Union[pd.DataFrame, pa.Table]): if isinstance(data, pa.Table): data = data.to_pandas() for col_name in data.columns: col_type = data[col_name].dtype if pd.api.types.is_datetime64_ns_dtype(col_type): print(f"Converting column '{col_name}' from nanosecond timestamp to microsecond timestamp.") data[col_name] = data[col_name].astype('datetime64[us]') return pa.Table.from_pandas(data) def _infer_pyarrow_type(self, value): """Infer PyArrow type from a Python value.""" if isinstance(value, str): return pa.string() elif isinstance(value, int): return pa.int64() elif isinstance(value, float): return pa.float64() elif isinstance(value, bool): return pa.bool_() elif isinstance(value, datetime): return pa.timestamp("ns") elif isinstance(value, date): return pa.date32() elif isinstance(value, dict): struct_fields = [pa.field(k, self._infer_pyarrow_type(v)) for k, v in value.items()] return pa.struct(struct_fields) elif isinstance(value, list): element_sample = next((x for x in value if x is not None), None) element_type = self._infer_pyarrow_type(element_sample) if element_sample else pa.string() return pa.list_(element_type) else: return pa.string() # Fallback for unsupported types def _map_pyarrow_to_iceberg(self, pa_type: pa.DataType) -> "IcebergType": """Map PyArrow types to Iceberg types, supporting all types from PyIceberg documentation.""" if pa.types.is_string(pa_type) or pa.types.is_large_string(pa_type): return StringType() elif pa.types.is_int8(pa_type) or pa.types.is_int16(pa_type) or pa.types.is_int32(pa_type): return IntegerType() elif pa.types.is_int64(pa_type): return LongType() elif pa.types.is_float16(pa_type) or pa.types.is_float32(pa_type): return FloatType() elif pa.types.is_float64(pa_type): return DoubleType() elif pa.types.is_boolean(pa_type): return BooleanType() elif pa.types.is_date32(pa_type) or pa.types.is_date64(pa_type): return DateType() elif pa.types.is_timestamp(pa_type): return TimestampType() elif pa.types.is_time32(pa_type) or pa.types.is_time64(pa_type): return TimeType() elif pa.types.is_binary(pa_type) or pa.types.is_large_binary(pa_type): return BinaryType() elif pa.types.is_fixed_size_binary(pa_type): return FixedType(length=pa_type.byte_width) elif pa.types.is_decimal(pa_type): return DecimalType(precision=pa_type.precision, scale=pa_type.scale) elif pa.types.is_list(pa_type): element_type = self._map_pyarrow_to_iceberg(pa_type.value_type) return ListType(element_id=0, element_type=element_type, element_required=False) elif pa.types.is_struct(pa_type): struct_fields = [ NestedField( field_id=i+1, name=field.name, field_type=self._map_pyarrow_to_iceberg(field.type), required=False ) for i, field in enumerate(pa_type) ] return StructType(fields=struct_fields) elif pa.types.is_map(pa_type): key_type = self._map_pyarrow_to_iceberg(pa_type.key_type) value_type = self._map_pyarrow_to_iceberg(pa_type.item_type) return MapType(key_id=0, key_type=key_type, value_id=1, value_type=value_type, value_required=False) else: # raise ValueError(f"Unsupported PyArrow type: {pa_type}") return StringType() # def _create_table_if_not_exists( self, table_name: str, namespace: str = "default", data: Optional[Union[pd.DataFrame, pa.Table]] = None, autogenerate_id: bool = False, generate_meta_columns: bool = False, index_columns: Optional[List[str]] = None ) -> Table: """Create Iceberg table if it doesn't exist with optional id and metadata columns.""" identifier = f"{namespace}.{table_name}" try: return self.catalog.load_table(identifier) except Exception: # Infer schema from data if provided schema_fields = [] data = self._convert_data_into_iceberg_supported_format(data) if data is not None: schema_fields.extend(self._infer_schema_from_data(data)) field_id_max = len(schema_fields) meta_columns = [] if generate_meta_columns: meta_column_schemas = [ NestedField(field_id=field_id_max+1, name="recordguid__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+2, name="recordstatus__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+3, name="created_by__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+4, name="owner__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+5, name="last_updated_by__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+6, name="created_at__", field_type=TimestampType(), required=False), NestedField(field_id=field_id_max+7, name="last_updated_at__", field_type=TimestampType(), required=False), NestedField(field_id=field_id_max+8, name="tags__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+9, name="user_actions__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+10, name="proposals__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+11, name="link_data__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+12, name="related_data__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+13, name="signal__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+14, name="exceptions__", field_type=StringType(), required=False), NestedField(field_id=field_id_max+15, name="remediation__", field_type=StringType(), required=False), ] schema_fields.extend(meta_column_schemas) data_columns = set(data.schema.names) print('data_columns :',data_columns) num_rows = data.num_rows new_columns = [] new_column_names = [] for meta_column_schema in meta_column_schemas: if meta_column_schema.name not in data_columns: default_array = pa.array([""] * num_rows, type=pa.string()) if meta_column_schema.name in ['created_at__','last_updated_at__']: epoch = datetime(1970, 1, 1) default_array = pa.array([epoch] * num_rows, type=pa.timestamp('ms')) data = data.append_column(meta_column_schema.name, default_array) old_schema = data.schema # Replace 'name' field to be non-nullable new_fields = [] for field in old_schema: if field.name == 'id': new_fields.append(pa.field(field.name, field.type, nullable=False)) else: new_fields.append(field) new_schema = pa.schema(new_fields) # Cast table to new schema (this enforces the schema but does not change data) data = data.cast(new_schema) id_column_field_id=1 for schema_field in schema_fields: if schema_field.name=='id': id_column_field_id=schema_field.field_id for schema_field in schema_fields: if schema_field.name=='id': # schema_field.required = True id_column_field_id=schema_field.field_id fields_schema = Schema( *schema_fields, identifier_field_ids=[id_column_field_id] ) table_properties = { "write.target-file-size-bytes": 67108864, } table = self.catalog.create_table( identifier=identifier, schema=fields_schema, properties=table_properties, ) return self.catalog.load_table(table.name()), data def _update_table_schema(self, table:Table, schema_details:list[NestedField]): existing_columns = {f.name for f in table.schema().fields} missing_meta_columns = [col for col in schema_details if col.name not in existing_columns] if not missing_meta_columns: return table # Start a transaction with table.transaction() as transaction: # Update schema to add missing metadata columns update_schema = transaction.update_schema(allow_incompatible_changes=True) for col in missing_meta_columns: update_schema = update_schema.add_column(path=col.name, field_type=col.field_type, required=col.required) update_schema.commit() print('successfully schema updated') return self.catalog.load_table(table.name()) def _update_schema_if_needed(self, table: Table, generate_meta_columns: bool) -> Table: """Update table schema transactionally to add missing metadata columns.""" if not generate_meta_columns: return table existing_columns = {f.name for f in table.schema().fields} missing_meta_columns = [col for col in self.metadata_columns if col.name not in existing_columns] if not missing_meta_columns: return table # Start a transaction with table.transaction() as transaction: # Update schema to add missing metadata columns for col in missing_meta_columns: transaction.update_schema().add_column(path=col.name, field_type=col.field_type, required=col.required) transaction.commit_transaction() return self.catalog.load_table(table.name()) def _convert_data_into_iceberg_supported_format(self, data: Union[pd.DataFrame, pa.Table]): if isinstance(data, pa.Table): data = data.to_pandas() for col_name in data.columns: col_type = data[col_name].dtype if pd.api.types.is_datetime64_ns_dtype(col_type): print(f"Converting column '{col_name}' from nanosecond timestamp to microsecond timestamp.") data[col_name] = data[col_name].astype('datetime64[us]') return pa.Table.from_pandas(data) def upload_file( self, data: Union[str, pd.DataFrame, pa.Table, bytes]=None, key: str = None, table_name: str = None, namespace: str = "default", autogenerate_id: bool = False, generate_meta_columns: bool = False, index_columns: Optional[List[str]] = None ) -> str: """Upload file to MinIO or Iceberg transactionally, returning the file URL.""" logging.debug(f"Uploading to table {namespace}.{table_name} with key {key}") if (self._is_iceberg_format(data) or self._is_iceberg_format(key)) and table_name: # Iceberg upload if isinstance(key, str): ext = os.path.splitext(key)[1].lower() if ext == ".csv": data = pd.read_csv(key) elif ext == ".json": data = pd.read_json(key) elif ext == ".parquet": data = pq.read_table(key) else: raise ValueError("Unsupported file format for Iceberg") # Convert to PyArrow Table if isinstance(data, pd.DataFrame): data = pa.Table.from_pandas(data, preserve_index=False) # Add metadata columns if requested if generate_meta_columns: data = data.append_column("created_at", pa.array([datetime.now()] * data.num_rows, pa.timestamp("ns"))) data = data.append_column("source", pa.array(["upload"] * data.num_rows, pa.string())) # Add autogenerated id if requested if autogenerate_id: data = data.append_column("id", pa.array([str(uuid.uuid4()) for _ in range(data.num_rows)], pa.string())) data = self._convert_data_into_iceberg_supported_format(data) # Create or load table table, data = self._create_table_if_not_exists( table_name, namespace, data, autogenerate_id, generate_meta_columns, index_columns ) # Update schema if needed table = self._update_schema_if_needed(table, generate_meta_columns) # cols_to_null = ['B', 'C'] # df.loc[:, cols_to_null] = None # Perform transactional upload with table.transaction() as transaction: logging.debug("Starting transaction") if autogenerate_id or "id" in [f.name for f in table.schema().fields]: existing_data = table.scan().to_arrow() if "id" in existing_data.column_names: existing_ids = set(existing_data["id"].to_pylist()) if existing_ids: new_data = data.filter(pc.invert(pc.is_in(data["id"], pa.array(existing_ids)))) update_data = data.filter(pc.is_in(data["id"], pa.array(existing_ids))) # Append new records if new_data.num_rows > 0: transaction.append(new_data) # Update existing records if update_data.num_rows > 0: transaction.overwrite(update_data) else: transaction.append(data) else: transaction.append(data) else: transaction.append(data) logging.debug("Transaction completed") # Verify metadata exists try: table = self.catalog.load_table(f"{namespace}.{table_name}") if not table.metadata.snapshots: raise ValueError(f"No snapshots found for table {namespace}.{table_name}") except Exception as e: logging.error(f"Metadata verification failed: {e}") raise # Set index properties : indexing not supported till now # if index_columns: # for col in index_columns: # table.update_properties({f"index_{col}": "true"}) logging.debug(f"Upload successful: s3://{self.bucket_name}/{namespace}/{table_name}") return f"s3://{self.bucket_name}/{namespace}/{table_name}" else: # Direct MinIO upload if isinstance(data, str): with open(data, "rb") as f: data = f.read() elif isinstance(data, pa.Table): buffer = BytesIO() pq.write_table(data, buffer) data = buffer.getvalue() elif isinstance(data, pd.DataFrame): data = pa.Table.from_pandas(data) buffer = BytesIO() pq.write_table(data, buffer) data = buffer.getvalue() self.minio_client.put_object( bucket_name=self.bucket_name, object_name=key, data=BytesIO(data), length=len(data) ) return f"s3://{self.bucket_name}/{key}" ice_berg_minio_utils = IcebergMinIOUtils() ice_berg_minio_utils.upload_file(key='file1.parquet', table_name='arul.sample',autogenerate_id=True, generate_meta_columns=True, index_columns=['id']) ``` ### Parallel Upsert Code ```python from pyiceberg.catalog import load_catalog from pyiceberg.expressions import EqualTo from pyiceberg.table import Table from concurrent.futures import ThreadPoolExecutor import pandas as pd from pyiceberg.exceptions import CommitFailedException import pyarrow as pa # Helper: Convert Iceberg type to PyArrow type def iceberg_type_to_pyarrow_type(iceberg_type): from pyiceberg.types import ( IntegerType, LongType, FloatType, DoubleType, StringType, BooleanType, TimestampType, DateType, ) if isinstance(iceberg_type, IntegerType): return pa.int32() elif isinstance(iceberg_type, LongType): return pa.int64() elif isinstance(iceberg_type, FloatType): return pa.float32() elif isinstance(iceberg_type, DoubleType): return pa.float64() elif isinstance(iceberg_type, StringType): return pa.string() elif isinstance(iceberg_type, BooleanType): return pa.bool_() elif isinstance(iceberg_type, TimestampType): return pa.timestamp("us") elif isinstance(iceberg_type, DateType): return pa.date32() else: raise NotImplementedError(f"Unsupported type: {iceberg_type}") # Initialize the catalog catalog = load_catalog( "default", **{ "uri": "http://localhost:8181", "s3.endpoint": "http://localhost:9000", "s3.access-key-id": "admin", "s3.secret-access-key": "password", "s3.path-style-access": "true", } ) # Load your table table: Table = catalog.load_table("default.arul.sample") iceberg_schema = table.schema() # Build PyArrow schema arrow_fields = [ pa.field(field.name, iceberg_type_to_pyarrow_type(field.field_type), nullable=not field.required) for field in iceberg_schema.fields ] arrow_schema = pa.schema(arrow_fields) def perform_upsert(partition_data): """Function to perform upsert on a partition of data""" try: df = pa.Table.from_pandas(pd.DataFrame(partition_data), schema=arrow_schema) upstr = table.upsert(df) print(f"Successfully upserted partition with {len(df)} records") print("upstr.rows_inserted :",upstr.rows_inserted) print("upstr.rows_updated :",upstr.rows_updated) # except (CommitFailedException): print(f"==== Retrying the update for records ==== ") # perform_upsert(partition_data) except Exception as e: print(f"Error in upsert: {str(e)}") all_data = [ [ { "guid": "b1004c82-6b45-48dd-b657-22044b6717d3", "file1_int_a": 0, "file1_float_b": 0.4099746151, "file1_string_c": "string1_0", "file1_int_1": 700708, "file1_float_1": -0.00013162700000000002, "file1_string_1": "unique_str1_feae5ebe-20af-446e-b7cd-0d239cee4f0a", "file1_bool_1": True, "file1_date_1": 1775537823, "file1_cat_1": "P", "file1_int_2": 377626, "file1_float_2": 2.8005186429, "file1_string_2": "changed_by_parallel_calls", "file1_bool_2": True, "file1_date_2": 1795124582, "file1_cat_2": "V", "file1_int_3": 650704, "file1_float_3": 0.0011361511, "file1_string_3": "final_unique_str1_7e8f1a71-faae-4071-a6d6-fa45d17f5645", "file1_bool_3": True, "created_at": "2025-06-11 16:20:58", "source": "upload", "id": "ec99ffef-d990-4f01-beea-3f4ac19bd502", "recordguid__": "", "recordstatus__": "", "created_by__": "", "owner__": "", "last_updated_by__": "", "created_at__": 0, "last_updated_at__": 0, "tags__": "", "user_actions__": "", "proposals__": "", "link_data__": "", "related_data__": "", "signal__": "", "exceptions__": "", "remediation__": "" } ], [ { "guid": "7d59bd17-b754-4474-a0d6-5c662edf49bd", "file1_int_a": 165230, "file1_float_b": 0.1493543918, "file1_string_c": "string1_165230", "file1_int_1": 649565, "file1_float_1": 0.1653516706, "file1_string_1": "unique_str1_e46814a7-1ce0-4284-ae17-3596f8462639", "file1_bool_1": True, "file1_date_1": 1852705711, "file1_cat_1": "S", "file1_int_2": 169692, "file1_float_2": 0.8480360042, "file1_string_2": "changed_by_parallel_calls", "file1_bool_2": False, "file1_date_2": 1815319797, "file1_cat_2": "Y", "file1_int_3": 1512764, "file1_float_3": 1.9787666482000001, "file1_string_3": "final_unique_str1_462118c4-5470-4b74-bce1-62183fa0bee1", "file1_bool_3": False, "created_at": "2025-06-11 16:20:58", "source": "upload", "id": "e0d8b00e-d538-4159-a843-7e44d9744c0c", "recordguid__": "", "recordstatus__": "", "created_by__": "", "owner__": "", "last_updated_by__": "", "created_at__": 0, "last_updated_at__": 0, "tags__": "", "user_actions__": "", "proposals__": "", "link_data__": "", "related_data__": "", "signal__": "", "exceptions__": "", "remediation__": "" } ], [ { "guid": "df31072c-b838-474f-a883-a9797fcf5524", "file1_int_a": 330460, "file1_float_b": 0.7084970022, "file1_string_c": "string1_330460", "file1_int_1": 731568, "file1_float_1": 0.33039105500000004, "file1_string_1": "unique_str1_7bd9fa0e-d7f5-4e1c-9c6d-b9cd9b544a64", "file1_bool_1": False, "file1_date_1": 1833308442, "file1_cat_1": "T", "file1_int_2": -10388, "file1_float_2": 2.3386936906, "file1_string_2": "changed_by_parallel_calls", "file1_bool_2": False, "file1_date_2": 1822519343, "file1_cat_2": "V", "file1_int_3": 94375, "file1_float_3": 0.6904563792, "file1_string_3": "final_unique_str1_a8ec3b67-031a-4ec4-b80e-1cca33c7d09c", "file1_bool_3": False, "created_at": "2025-06-11 16:20:58", "source": "upload", "id": "00da629e-0790-462f-ba9a-44fd3db3ecc6", "recordguid__": "", "recordstatus__": "", "created_by__": "", "owner__": "", "last_updated_by__": "", "created_at__": 0, "last_updated_at__": 0, "tags__": "", "user_actions__": "", "proposals__": "", "link_data__": "", "related_data__": "", "signal__": "", "exceptions__": "", "remediation__": "" } ] ] # Perform parallel upserts with ThreadPoolExecutor(max_workers=4) as executor: # Adjust max_workers as needed futures = [executor.submit(perform_upsert, partition) for partition in all_data] # Wait for all futures to complete for future in futures: future.result() # This will raise exceptions if any occurred ``` ## Environment Details - apache/iceberg-rest-fixture: latest(a month ago) - tabulario/spark-iceberg: latest(3 months ago) - PyIceberg version: 0.9.1 - MinIO version: RELEASE.2025-04-22T22-12-26Z - Storage backend: MinIO ## Additional Observations - The issue seems related to concurrent metadata file updates - Physical data files remain intact, suggesting only metadata is affected - The problem becomes more likely with higher concurrency ## Request 1. Help identify the root cause of the metadata corruption 2. Suggestions for safe parallel upsert patterns in Iceberg 3. Any known issues or best practices around concurrent writes in Iceberg 4. Recovery options for the corrupted table metadata ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org