arul-cc opened a new issue, #2120:
URL: https://github.com/apache/iceberg-python/issues/2120

   ### Apache Iceberg version
   
   0.9.1 (latest release)
   
   ### Please describe the bug 🐞
   
   ## Problem Description
   While performing parallel upsert operations on an Iceberg table, the table 
metadata appears to have been corrupted, causing the table to become 
inaccessible (though the physical data files still exist). The issue occurs 
when multiple concurrent upsert operations are attempted.
   
   ## Expected Behavior
   - Parallel upsert operations should either:
     - Succeed with proper ACID transaction handling, or
     - Fail gracefully without corrupting table metadata
   
   ## Actual Behavior
   1. First upsert operation succeeds
   2. Subsequent operations fail with `CommitFailedException` (expected due to 
ACID constraints)
   3. With more parallel calls, the table becomes inaccessible with "table not 
found" errors
   4. Physical data files remain intact, suggesting metadata corruption
   
   ## Reproduction Steps
   1. Initialize Iceberg table with sample data using the provided 
`upload_file` code
   2. Attempt parallel upsert operations using the `perform_upsert` function 
with multiple workers
   3. Observe the table becomes inaccessible after several attempts
   
   ## Code Samples
   
   ### Table Initialization Code
   ```python
   import os
   import uuid
   import pandas as pd
   import pyarrow as pa
   import pyarrow.parquet as pq
   import pyarrow.compute as pc
   from minio import Minio
   from minio.error import S3Error
   from pyiceberg.catalog import load_catalog
   from pyiceberg.schema import Schema
   from pyiceberg.types import (
       StringType, IntegerType, LongType, FloatType, DoubleType, BooleanType,
       DateType, TimestampType, TimeType, BinaryType, DecimalType, FixedType,
       ListType, MapType, StructType, NestedField
   )
   from pyiceberg.expressions import EqualTo
   from pyiceberg.table import Table
   from io import BytesIO
   from typing import List, Dict, Union, Optional
   from datetime import datetime
   import logging
   
   
   
   # logging.basicConfig(level=logging.DEBUG)
   
   class IcebergMinIOUtils:
       def __init__(self):
           """Initialize MinIO and Iceberg clients."""
           # MinIO client
           self.minio_client = Minio(
               endpoint="minio:9000",
               access_key="admin",
               secret_key="password",
               secure=False
           )
           self.bucket_name = "warehouse"
   
           # Iceberg catalog
           self.catalog = load_catalog(
                   "default",
                   **{
                       "uri": "http://localhost:8181";,
                       "s3.endpoint": "http://localhost:9000";,
                       "s3.access-key-id": "admin",
                       "s3.secret-access-key": "password",
                       "s3.path-style-access": "true",
                   }
               )
   
           # Size cap from environment variable (in MB)
           self.max_download_size_mb = float(os.getenv("MAX_DOWNLOAD_SIZE_MB", 
100))
   
           # Metadata columns definition
           self.metadata_columns = [
               NestedField(field_id=1000, name="created_at", 
field_type=TimestampType(), required=False),
               NestedField(field_id=1001, name="source", 
field_type=StringType(), required=False)
           ]
   
       def _is_iceberg_format(self, data: Union[str, pd.DataFrame, pa.Table, 
bytes]) -> bool:
           """Check if data is in a format suitable for Iceberg (JSON, Parquet, 
CSV, DataFrame)."""
           if isinstance(data, pd.DataFrame) or isinstance(data, pa.Table):
               return True
           if isinstance(data, str):
               ext = os.path.splitext(data)[1].lower()
               return ext in [".json", ".parquet", ".csv"]
           return False
   
       def _infer_schema_from_data(self, data: Union[pd.DataFrame, pa.Table]) 
-> List[NestedField]:
           """Infer Iceberg schema fields from Pandas DataFrame or PyArrow 
Table, including JSON/dict types."""
           if isinstance(data, pd.DataFrame):
               # Convert object columns containing dicts or lists to PyArrow 
StructType or ListType
               for col in data.columns:
                   if data[col].dtype == "object":
                       sample = next((x for x in data[col].dropna() if x is not 
None), None)
                       if isinstance(sample, dict):
                           # Infer struct fields from dict
                           struct_fields = [
                               pa.field(k, self._infer_pyarrow_type(v)) for k, 
v in sample.items()
                           ]
                           data[col] = data[col].apply(lambda x: x if 
isinstance(x, dict) else None)
                           data[col] = pa.array(data[col], 
pa.struct(struct_fields))
                       elif isinstance(sample, list):
                           # Infer list element type from first non-empty list
                           element_sample = next((x for l in data[col].dropna() 
for x in l if l), None)
                           element_type = 
self._infer_pyarrow_type(element_sample) if element_sample else pa.string()
                           data[col] = data[col].apply(lambda x: x if 
isinstance(x, list) else None)
                           data[col] = pa.array(data[col], 
pa.list_(element_type))
               data = pa.Table.from_pandas(data, preserve_index=False)
   
           fields = []
           for i, field in enumerate(data.schema):
               field_type = self._map_pyarrow_to_iceberg(field.type)
               required = False
               if field.name=="id":
                   required = True
               fields.append(NestedField(field_id=i+1, name=field.name, 
field_type=field_type, required=required))
           return fields
   
       def _convert_data_into_iceberg_supported_format(self, data: 
Union[pd.DataFrame, pa.Table]):
           if isinstance(data, pa.Table):
               data = data.to_pandas()
           
           for col_name in data.columns:
               col_type = data[col_name].dtype
   
               if pd.api.types.is_datetime64_ns_dtype(col_type):
                   print(f"Converting column '{col_name}' from nanosecond 
timestamp to microsecond timestamp.")
                   data[col_name] = data[col_name].astype('datetime64[us]')
   
           
   
           return pa.Table.from_pandas(data)
   
       def _infer_pyarrow_type(self, value):
           """Infer PyArrow type from a Python value."""
           if isinstance(value, str):
               return pa.string()
           elif isinstance(value, int):
               return pa.int64()
           elif isinstance(value, float):
               return pa.float64()
           elif isinstance(value, bool):
               return pa.bool_()
           elif isinstance(value, datetime):
               return pa.timestamp("ns")
           elif isinstance(value, date):
               return pa.date32()
           elif isinstance(value, dict):
               struct_fields = [pa.field(k, self._infer_pyarrow_type(v)) for k, 
v in value.items()]
               return pa.struct(struct_fields)
           elif isinstance(value, list):
               element_sample = next((x for x in value if x is not None), None)
               element_type = self._infer_pyarrow_type(element_sample) if 
element_sample else pa.string()
               return pa.list_(element_type)
           else:
               return pa.string()  # Fallback for unsupported types
   
       def _map_pyarrow_to_iceberg(self, pa_type: pa.DataType) -> "IcebergType":
           """Map PyArrow types to Iceberg types, supporting all types from 
PyIceberg documentation."""
           if pa.types.is_string(pa_type) or pa.types.is_large_string(pa_type):
               return StringType()
           elif pa.types.is_int8(pa_type) or pa.types.is_int16(pa_type) or 
pa.types.is_int32(pa_type):
               return IntegerType()
           elif pa.types.is_int64(pa_type):
               return LongType()
           elif pa.types.is_float16(pa_type) or pa.types.is_float32(pa_type):
               return FloatType()
           elif pa.types.is_float64(pa_type):
               return DoubleType()
           elif pa.types.is_boolean(pa_type):
               return BooleanType()
           elif pa.types.is_date32(pa_type) or pa.types.is_date64(pa_type):
               return DateType()
           elif pa.types.is_timestamp(pa_type):
               return TimestampType()
           elif pa.types.is_time32(pa_type) or pa.types.is_time64(pa_type):
               return TimeType()
           elif pa.types.is_binary(pa_type) or 
pa.types.is_large_binary(pa_type):
               return BinaryType()
           elif pa.types.is_fixed_size_binary(pa_type):
               return FixedType(length=pa_type.byte_width)
           elif pa.types.is_decimal(pa_type):
               return DecimalType(precision=pa_type.precision, 
scale=pa_type.scale)
           elif pa.types.is_list(pa_type):
               element_type = self._map_pyarrow_to_iceberg(pa_type.value_type)
               return ListType(element_id=0, element_type=element_type, 
element_required=False)
           elif pa.types.is_struct(pa_type):
               struct_fields = [
                   NestedField(
                       field_id=i+1,
                       name=field.name,
                       field_type=self._map_pyarrow_to_iceberg(field.type),
                       required=False
                   )
                   for i, field in enumerate(pa_type)
               ]
               return StructType(fields=struct_fields)
           elif pa.types.is_map(pa_type):
               key_type = self._map_pyarrow_to_iceberg(pa_type.key_type)
               value_type = self._map_pyarrow_to_iceberg(pa_type.item_type)
               return MapType(key_id=0, key_type=key_type, value_id=1, 
value_type=value_type, value_required=False)
           else:
               # raise ValueError(f"Unsupported PyArrow type: {pa_type}")
               return StringType()
   
       # 
   
       def _create_table_if_not_exists(
           self,
           table_name: str,
           namespace: str = "default",
           data: Optional[Union[pd.DataFrame, pa.Table]] = None,
           autogenerate_id: bool = False,
           generate_meta_columns: bool = False,
           index_columns: Optional[List[str]] = None
       ) -> Table:
           """Create Iceberg table if it doesn't exist with optional id and 
metadata columns."""
           identifier = f"{namespace}.{table_name}"
           try:
               return self.catalog.load_table(identifier)
           except Exception:
               # Infer schema from data if provided
               schema_fields = []
               data = self._convert_data_into_iceberg_supported_format(data)
               if data is not None:
                   schema_fields.extend(self._infer_schema_from_data(data))
   
                   field_id_max = len(schema_fields)
   
                   meta_columns = []
   
                   if generate_meta_columns:
   
                       meta_column_schemas = [
                           NestedField(field_id=field_id_max+1, 
name="recordguid__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+2, 
name="recordstatus__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+3, 
name="created_by__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+4, name="owner__", 
field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+5, 
name="last_updated_by__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+6, 
name="created_at__", field_type=TimestampType(), required=False),
                           NestedField(field_id=field_id_max+7, 
name="last_updated_at__", field_type=TimestampType(), required=False),
                           NestedField(field_id=field_id_max+8, name="tags__", 
field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+9, 
name="user_actions__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+10, 
name="proposals__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+11, 
name="link_data__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+12, 
name="related_data__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+13, 
name="signal__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+14, 
name="exceptions__", field_type=StringType(), required=False),
                           NestedField(field_id=field_id_max+15, 
name="remediation__", field_type=StringType(), required=False),
                       ]
   
                       schema_fields.extend(meta_column_schemas)
   
                       data_columns = set(data.schema.names)
   
                       print('data_columns :',data_columns)
   
                       num_rows = data.num_rows
                       new_columns = []
                       new_column_names = []
   
                       for meta_column_schema in meta_column_schemas:
                           if meta_column_schema.name not in data_columns:
                               
                               default_array = pa.array([""] * num_rows, 
type=pa.string())
                               if meta_column_schema.name in 
['created_at__','last_updated_at__']:
                                   epoch = datetime(1970, 1, 1)
                                   default_array = pa.array([epoch] * num_rows, 
type=pa.timestamp('ms'))
                               data = 
data.append_column(meta_column_schema.name, default_array)
   
                       old_schema = data.schema
           
                       # Replace 'name' field to be non-nullable
                       new_fields = []
                       for field in old_schema:
                           if field.name == 'id':
                               new_fields.append(pa.field(field.name, 
field.type, nullable=False))
                           else:
                               new_fields.append(field)
   
                       new_schema = pa.schema(new_fields)
                       
                       # Cast table to new schema (this enforces the schema but 
does not change data)
                       data = data.cast(new_schema)
   
               
   
                       
   
               id_column_field_id=1
   
               for schema_field in schema_fields:
                   if schema_field.name=='id':
                       id_column_field_id=schema_field.field_id
   
               for schema_field in schema_fields:
                   if schema_field.name=='id':
                       # schema_field.required = True
                       id_column_field_id=schema_field.field_id
   
   
               fields_schema = Schema(
                   *schema_fields,
                   identifier_field_ids=[id_column_field_id]
               )
               
               table_properties = {
                   "write.target-file-size-bytes": 67108864,
               }
   
               table = self.catalog.create_table(
                   identifier=identifier,
                   schema=fields_schema,
                   properties=table_properties,
               )
               return self.catalog.load_table(table.name()), data
   
       def _update_table_schema(self, table:Table, 
schema_details:list[NestedField]):
           existing_columns = {f.name for f in table.schema().fields}
           missing_meta_columns = [col for col in schema_details if col.name 
not in existing_columns]
   
           if not missing_meta_columns:
               return table
               
   
            # Start a transaction
           with table.transaction() as transaction:
               # Update schema to add missing metadata columns
               update_schema = 
transaction.update_schema(allow_incompatible_changes=True)
               for col in missing_meta_columns:
                   update_schema = update_schema.add_column(path=col.name, 
field_type=col.field_type, required=col.required)
               update_schema.commit()
               print('successfully schema updated')
           return self.catalog.load_table(table.name())
           
           
   
       def _update_schema_if_needed(self, table: Table, generate_meta_columns: 
bool) -> Table:
           """Update table schema transactionally to add missing metadata 
columns."""
           if not generate_meta_columns:
               return table
   
           existing_columns = {f.name for f in table.schema().fields}
           missing_meta_columns = [col for col in self.metadata_columns if 
col.name not in existing_columns]
   
           if not missing_meta_columns:
               return table
   
           # Start a transaction
           with table.transaction() as transaction:
               # Update schema to add missing metadata columns
               for col in missing_meta_columns:
                   transaction.update_schema().add_column(path=col.name, 
field_type=col.field_type, required=col.required)
               transaction.commit_transaction()
           return self.catalog.load_table(table.name())
   
       def _convert_data_into_iceberg_supported_format(self, data: 
Union[pd.DataFrame, pa.Table]):
           if isinstance(data, pa.Table):
               data = data.to_pandas()
           
           for col_name in data.columns:
               col_type = data[col_name].dtype
   
               if pd.api.types.is_datetime64_ns_dtype(col_type):
                   print(f"Converting column '{col_name}' from nanosecond 
timestamp to microsecond timestamp.")
                   data[col_name] = data[col_name].astype('datetime64[us]')
   
           return pa.Table.from_pandas(data)
   
       def upload_file(
           self,
           data: Union[str, pd.DataFrame, pa.Table, bytes]=None,
           key: str = None,
           table_name: str = None,
           namespace: str = "default",
           autogenerate_id: bool = False,
           generate_meta_columns: bool = False,
           index_columns: Optional[List[str]] = None
       ) -> str:
           """Upload file to MinIO or Iceberg transactionally, returning the 
file URL."""
           logging.debug(f"Uploading to table {namespace}.{table_name} with key 
{key}")
   
           if (self._is_iceberg_format(data) or self._is_iceberg_format(key)) 
and table_name:
               # Iceberg upload
               if isinstance(key, str):
                   ext = os.path.splitext(key)[1].lower()
                   if ext == ".csv":
                       data = pd.read_csv(key)
                   elif ext == ".json":
                       data = pd.read_json(key)
                   elif ext == ".parquet":
                       data = pq.read_table(key)
                   else:
                       raise ValueError("Unsupported file format for Iceberg")
   
               # Convert to PyArrow Table
               if isinstance(data, pd.DataFrame):
                   data = pa.Table.from_pandas(data, preserve_index=False)
   
               
   
               # Add metadata columns if requested
               if generate_meta_columns:
                   data = data.append_column("created_at", 
pa.array([datetime.now()] * data.num_rows, pa.timestamp("ns")))
                   data = data.append_column("source", pa.array(["upload"] * 
data.num_rows, pa.string()))
   
               # Add autogenerated id if requested
               if autogenerate_id:
                   data = data.append_column("id", pa.array([str(uuid.uuid4()) 
for _ in range(data.num_rows)], pa.string()))
   
               data = self._convert_data_into_iceberg_supported_format(data)
               
               # Create or load table
               table, data = self._create_table_if_not_exists(
                   table_name, namespace, data, autogenerate_id, 
generate_meta_columns, index_columns
               )
   
               # Update schema if needed
               table = self._update_schema_if_needed(table, 
generate_meta_columns)
   
               # cols_to_null = ['B', 'C']
   
               # df.loc[:, cols_to_null] = None
   
               # Perform transactional upload
               with table.transaction() as transaction:
                   logging.debug("Starting transaction")
                   if autogenerate_id or "id" in [f.name for f in 
table.schema().fields]:
                       existing_data = table.scan().to_arrow()
                       if "id" in existing_data.column_names:
                           existing_ids = set(existing_data["id"].to_pylist())
                           if existing_ids:
                               new_data = 
data.filter(pc.invert(pc.is_in(data["id"], pa.array(existing_ids))))
                               update_data = data.filter(pc.is_in(data["id"], 
pa.array(existing_ids)))
                               # Append new records
                               if new_data.num_rows > 0:
                                   transaction.append(new_data)
                               # Update existing records
                               if update_data.num_rows > 0:
                                   transaction.overwrite(update_data)
                           else:
                               transaction.append(data)
                       else:
                           transaction.append(data)
                   else:
                       transaction.append(data)
                   logging.debug("Transaction completed")
   
               # Verify metadata exists
               try:
                   table = self.catalog.load_table(f"{namespace}.{table_name}")
                   if not table.metadata.snapshots:
                       raise ValueError(f"No snapshots found for table 
{namespace}.{table_name}")
               except Exception as e:
                   logging.error(f"Metadata verification failed: {e}")
                   raise
   
               # Set index properties : indexing not supported till now
               # if index_columns:
               #     for col in index_columns:
               #         table.update_properties({f"index_{col}": "true"})
   
               logging.debug(f"Upload successful: 
s3://{self.bucket_name}/{namespace}/{table_name}")
               return f"s3://{self.bucket_name}/{namespace}/{table_name}"
   
           else:
               # Direct MinIO upload
               if isinstance(data, str):
                   with open(data, "rb") as f:
                       data = f.read()
               elif isinstance(data, pa.Table):
                   buffer = BytesIO()
                   pq.write_table(data, buffer)
                   data = buffer.getvalue()
               elif isinstance(data, pd.DataFrame):
                   data = pa.Table.from_pandas(data)
                   buffer = BytesIO()
                   pq.write_table(data, buffer)
                   data = buffer.getvalue()
   
               self.minio_client.put_object(
                   bucket_name=self.bucket_name,
                   object_name=key,
                   data=BytesIO(data),
                   length=len(data)
               )
               return f"s3://{self.bucket_name}/{key}"
   
   ice_berg_minio_utils = IcebergMinIOUtils()
   
   ice_berg_minio_utils.upload_file(key='file1.parquet', 
table_name='arul.sample',autogenerate_id=True, generate_meta_columns=True, 
index_columns=['id'])
   ```
   
   ### Parallel Upsert Code
   ```python
   from pyiceberg.catalog import load_catalog
   from pyiceberg.expressions import EqualTo
   from pyiceberg.table import Table
   from concurrent.futures import ThreadPoolExecutor
   import pandas as pd
   from pyiceberg.exceptions import CommitFailedException
   
   import pyarrow as pa
   
   # Helper: Convert Iceberg type to PyArrow type
   def iceberg_type_to_pyarrow_type(iceberg_type):
       from pyiceberg.types import (
           IntegerType, LongType, FloatType, DoubleType,
           StringType, BooleanType, TimestampType, DateType,
       )
   
       if isinstance(iceberg_type, IntegerType):
           return pa.int32()
       elif isinstance(iceberg_type, LongType):
           return pa.int64()
       elif isinstance(iceberg_type, FloatType):
           return pa.float32()
       elif isinstance(iceberg_type, DoubleType):
           return pa.float64()
       elif isinstance(iceberg_type, StringType):
           return pa.string()
       elif isinstance(iceberg_type, BooleanType):
           return pa.bool_()
       elif isinstance(iceberg_type, TimestampType):
           return pa.timestamp("us")
       elif isinstance(iceberg_type, DateType):
           return pa.date32()
       else:
           raise NotImplementedError(f"Unsupported type: {iceberg_type}")
   
   
   
   
   # Initialize the catalog
   catalog = load_catalog(
               "default",
               **{
                   "uri": "http://localhost:8181";,
                   "s3.endpoint": "http://localhost:9000";,
                   "s3.access-key-id": "admin",
                   "s3.secret-access-key": "password",
                   "s3.path-style-access": "true",
               }
           )
   # Load your table
   table: Table = catalog.load_table("default.arul.sample")
   
   iceberg_schema = table.schema()
   # Build PyArrow schema
   arrow_fields = [
       pa.field(field.name, iceberg_type_to_pyarrow_type(field.field_type), 
nullable=not field.required)
       for field in iceberg_schema.fields
   ]
   arrow_schema = pa.schema(arrow_fields)
   
   
   
   def perform_upsert(partition_data):
       """Function to perform upsert on a partition of data"""
       try:
           df = pa.Table.from_pandas(pd.DataFrame(partition_data), 
schema=arrow_schema)
           upstr = table.upsert(df)
           print(f"Successfully upserted partition with {len(df)} records")
           print("upstr.rows_inserted :",upstr.rows_inserted)
           print("upstr.rows_updated :",upstr.rows_updated)
       # except (CommitFailedException):
           print(f"==== Retrying the update for records ==== ")
           # perform_upsert(partition_data)
       except Exception as e:
           print(f"Error in upsert: {str(e)}")
   
   
   all_data = [
     [
       {
         "guid": "b1004c82-6b45-48dd-b657-22044b6717d3",
         "file1_int_a": 0,
         "file1_float_b": 0.4099746151,
         "file1_string_c": "string1_0",
         "file1_int_1": 700708,
         "file1_float_1": -0.00013162700000000002,
         "file1_string_1": "unique_str1_feae5ebe-20af-446e-b7cd-0d239cee4f0a",
         "file1_bool_1": True,
         "file1_date_1": 1775537823,
         "file1_cat_1": "P",
         "file1_int_2": 377626,
         "file1_float_2": 2.8005186429,
         "file1_string_2": "changed_by_parallel_calls",
         "file1_bool_2": True,
         "file1_date_2": 1795124582,
         "file1_cat_2": "V",
         "file1_int_3": 650704,
         "file1_float_3": 0.0011361511,
         "file1_string_3": 
"final_unique_str1_7e8f1a71-faae-4071-a6d6-fa45d17f5645",
         "file1_bool_3": True,
         "created_at": "2025-06-11 16:20:58",
         "source": "upload",
         "id": "ec99ffef-d990-4f01-beea-3f4ac19bd502",
         "recordguid__": "",
         "recordstatus__": "",
         "created_by__": "",
         "owner__": "",
         "last_updated_by__": "",
         "created_at__": 0,
         "last_updated_at__": 0,
         "tags__": "",
         "user_actions__": "",
         "proposals__": "",
         "link_data__": "",
         "related_data__": "",
         "signal__": "",
         "exceptions__": "",
         "remediation__": ""
       }
     ],
     [
       {
         "guid": "7d59bd17-b754-4474-a0d6-5c662edf49bd",
         "file1_int_a": 165230,
         "file1_float_b": 0.1493543918,
         "file1_string_c": "string1_165230",
         "file1_int_1": 649565,
         "file1_float_1": 0.1653516706,
         "file1_string_1": "unique_str1_e46814a7-1ce0-4284-ae17-3596f8462639",
         "file1_bool_1": True,
         "file1_date_1": 1852705711,
         "file1_cat_1": "S",
         "file1_int_2": 169692,
         "file1_float_2": 0.8480360042,
         "file1_string_2": "changed_by_parallel_calls",
         "file1_bool_2": False,
         "file1_date_2": 1815319797,
         "file1_cat_2": "Y",
         "file1_int_3": 1512764,
         "file1_float_3": 1.9787666482000001,
         "file1_string_3": 
"final_unique_str1_462118c4-5470-4b74-bce1-62183fa0bee1",
         "file1_bool_3": False,
         "created_at": "2025-06-11 16:20:58",
         "source": "upload",
         "id": "e0d8b00e-d538-4159-a843-7e44d9744c0c",
         "recordguid__": "",
         "recordstatus__": "",
         "created_by__": "",
         "owner__": "",
         "last_updated_by__": "",
         "created_at__": 0,
         "last_updated_at__": 0,
         "tags__": "",
         "user_actions__": "",
         "proposals__": "",
         "link_data__": "",
         "related_data__": "",
         "signal__": "",
         "exceptions__": "",
         "remediation__": ""
       }
     ],
     [
       {
         "guid": "df31072c-b838-474f-a883-a9797fcf5524",
         "file1_int_a": 330460,
         "file1_float_b": 0.7084970022,
         "file1_string_c": "string1_330460",
         "file1_int_1": 731568,
         "file1_float_1": 0.33039105500000004,
         "file1_string_1": "unique_str1_7bd9fa0e-d7f5-4e1c-9c6d-b9cd9b544a64",
         "file1_bool_1": False,
         "file1_date_1": 1833308442,
         "file1_cat_1": "T",
         "file1_int_2": -10388,
         "file1_float_2": 2.3386936906,
         "file1_string_2": "changed_by_parallel_calls",
         "file1_bool_2": False,
         "file1_date_2": 1822519343,
         "file1_cat_2": "V",
         "file1_int_3": 94375,
         "file1_float_3": 0.6904563792,
         "file1_string_3": 
"final_unique_str1_a8ec3b67-031a-4ec4-b80e-1cca33c7d09c",
         "file1_bool_3": False,
         "created_at": "2025-06-11 16:20:58",
         "source": "upload",
         "id": "00da629e-0790-462f-ba9a-44fd3db3ecc6",
         "recordguid__": "",
         "recordstatus__": "",
         "created_by__": "",
         "owner__": "",
         "last_updated_by__": "",
         "created_at__": 0,
         "last_updated_at__": 0,
         "tags__": "",
         "user_actions__": "",
         "proposals__": "",
         "link_data__": "",
         "related_data__": "",
         "signal__": "",
         "exceptions__": "",
         "remediation__": ""
       }
     ]
   ]
   
   # Perform parallel upserts
   with ThreadPoolExecutor(max_workers=4) as executor:  # Adjust max_workers as 
needed
       futures = [executor.submit(perform_upsert, partition) for partition in 
all_data]
       
       # Wait for all futures to complete
       for future in futures:
           future.result()  # This will raise exceptions if any occurred
   ```
   
   ## Environment Details
   - apache/iceberg-rest-fixture: latest(a month ago)
   - tabulario/spark-iceberg: latest(3 months ago)
   - PyIceberg version: 0.9.1
   - MinIO version: RELEASE.2025-04-22T22-12-26Z
   - Storage backend: MinIO
   
   ## Additional Observations
   - The issue seems related to concurrent metadata file updates
   - Physical data files remain intact, suggesting only metadata is affected
   - The problem becomes more likely with higher concurrency
   
   ## Request
   1. Help identify the root cause of the metadata corruption
   2. Suggestions for safe parallel upsert patterns in Iceberg
   3. Any known issues or best practices around concurrent writes in Iceberg
   4. Recovery options for the corrupted table metadata
   
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [ ] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to