soumilshah1995 opened a new issue, #16782:
URL: https://github.com/apache/iceberg/issues/16782

   ### Apache Iceberg version
   
   None
   
   ### Query engine
   
   None
   
   ### Please describe the bug 🐞
   
   # Iceberg VARIANT shredding fails on wide mixed-schema tables (EMR + S3 
Tables)
   
   **Suggested GitHub title:** `SparkParquetWriterWithVariantShredding` fails 
on wide Iceberg v3 tables with `shred-variants=true`
   
   **Target repo:** [apache/iceberg](https://github.com/apache/iceberg/issues)
   
   ---
   
   ## Summary
   
   With `spark.sql.iceberg.shred-variants=true`, INSERT into an Iceberg **v3** 
table that mixes scalars, ARRAYs, and many VARIANT columns fails:
   
   ```
   ClassCastException: optional binary element (STRING) = N is not a group
     at SparkParquetWriterWithVariantShredding.initialize(...)
   ```
   
   Same JSON, same `parse_json()` expressions, **2 rows only** — but a 
**3-column** table (`id` + 2 VARIANT) **passes**. Points to writer/schema 
layout, not bad JSON.
   
   ---
   
   ## Environment
   
   | Component | Version |
   |-----------|---------|
   | Spark | 4.0.2-amzn-0 (EMR 8.x) |
   | Iceberg | 1.11.0 (`1.10.1-amzn-0` also on EMR classpath) |
   | Catalog | S3 Tables (`s3-tables-catalog-for-iceberg-runtime:0.1.5`) |
   | Table | format-version **3**, VARIANT via `parse_json()` |
   
   **Note:** Wide write **passes** on local Apache Spark + Hadoop warehouse + 
Iceberg 1.11. Failure seen on **EMR + S3 Tables**.
   
   ---
   
   ## Reproduce
   
   Repro script: `emr_s3tables_shred_repro.py` (mock data, no external files).
   
   ```bash
   spark-submit --deploy-mode client \
     --driver-memory 2g --executor-memory 2g \
     --executor-cores 2 --num-executors 2 \
     --conf spark.sql.iceberg.shred-variants=true \
     emr_s3tables_shred_repro.py
   ```
   
   | Case | Table shape | Result |
   |------|-------------|--------|
   | minimal | `id` + 2 VARIANT | PASS |
   | wide | 20 cols (scalars + 11 VARIANT + 2 ARRAY) | **FAIL** |
   
   Wide DDL (core columns):
   
   ```sql
   CREATE TABLE t (
     id STRING, region STRING, user_key STRING, email STRING,
     payload_a VARIANT, payload_b VARIANT, updated_at TIMESTAMP,
     status STRING, hash_key STRING,
     extra_a VARIANT, extra_b VARIANT, extra_c VARIANT, extra_d VARIANT,
     extra_e VARIANT, extra_f VARIANT, extra_g VARIANT,
     id_list ARRAY<STRING>, extra_h VARIANT, extra_i VARIANT,
     tag_list ARRAY<STRING>
   ) USING iceberg
   PARTITIONED BY (bucket(16, id))
   TBLPROPERTIES ('format-version' = '3');
   ```
   
   Insert (both rows identical):
   
   ```sql
   INSERT OVERWRITE t (...)
   SELECT
     id, region, user_key, email,
     parse_json(payload_a), parse_json(payload_b),
     COALESCE(updated_at, current_timestamp()), status, hash_key,
     parse_json(extra_a), /* ... other VARIANT cols ... */
     CASE WHEN id_list = '[]' THEN array() ELSE from_json(id_list, 
'ARRAY<STRING>') END,
     /* ... */
   FROM mock_source;
   ```
   
   Mock JSON (same on every row): `{"channel":{"type":"email"}}`, 
`{"segment":"alpha","score":42}`, etc.
   
   ---
   
   ## Expected vs actual
   
   - **Expected:** Wide table writes with shredding enabled when JSON is valid 
and consistent.
   - **Actual:** `ClassCastException` at Parquet write; field id `N` is 
internal Parquet id (not a JSON key).
   
   ---
   
   ## Ruled out
   
   - Mixed JSON types at same path → fails **not** reproduced in minimal 3-col 
table
   - Row volume → fails with **2 rows**
   - INSERT SQL → same SQL works in minimal table
   - **`shred-variants=false`** → full schema **works** (workaround)
   
   ---
   
   ## Workaround
   
   ```
   spark.sql.iceberg.shred-variants=false
   ```
   
   ---
   
   ## Stack trace (abbreviated)
   
   ```
   ClassCastException: optional binary element (STRING) = 21 is not a group
     at Type.asGroupType(Type.java:247)
     at ParquetWithSparkSchemaVisitor.visit(...)
     at SparkParquetWriters.buildWriter(...)
     at SparkParquetWriterWithVariantShredding.initialize(...)
     at SparkWrite$PartitionedDataWriter.write(...)
   ```
   
   ---
   
   ## Also file if Iceberg says “catalog-specific”
   
   - [awslabs/s3-tables-catalog](https://github.com/awslabs/s3-tables-catalog)
   - AWS EMR (Spark `4.0.2-amzn-0`)
   
   ---
   
   ## Attachments
   
   ```
   #!/usr/bin/env python3
   """
   
   spark-submit --deploy-mode client \
     --driver-memory 2g \
     --executor-memory 2g \
     --executor-cores 2 \
     --num-executors 2 \
     --conf spark.sql.iceberg.shred-variants=true \
     emr_s3tables_shred_repro.py
   """
   from __future__ import annotations
   
   import argparse
   import time
   import traceback
   
   import boto3
   from botocore.exceptions import ClientError
   from pyspark.sql import SparkSession
   from pyspark.sql.types import StringType, StructField, StructType
   
   CATALOG = "crm_user_props"
   NAMESPACE = "test"
   SOURCE_VIEW = "mock_source"
   TABLE_BUCKET_ARN = (
       "arn:aws:s3tables:us-east-1:XX:bucket/XX"
   )
   AWS_REGION = "us-east-1"
   DELETE_POLL_SEC = 5
   DELETE_TIMEOUT_SEC = 90
   
   JSON_A = '{"channel": {"type": "email", "value": "[email protected]"}}'
   JSON_B = '{"segment": "alpha", "score": 42}'
   JSON_SIMPLE = '{"enabled": true}'
   
   WIDE_SOURCE_SCHEMA = StructType(
       [
           StructField("id", StringType()),
           StructField("region", StringType()),
           StructField("user_key", StringType()),
           StructField("email", StringType()),
           StructField("payload_a", StringType()),
           StructField("payload_b", StringType()),
           StructField("updated_at", StringType()),
           StructField("status", StringType()),
           StructField("hash_key", StringType()),
           StructField("extra_a", StringType()),
           StructField("extra_b", StringType()),
           StructField("extra_c", StringType()),
           StructField("extra_d", StringType()),
           StructField("extra_e", StringType()),
           StructField("extra_f", StringType()),
           StructField("extra_g", StringType()),
           StructField("id_list", StringType()),
           StructField("extra_h", StringType()),
           StructField("extra_i", StringType()),
           StructField("tag_list", StringType()),
       ]
   )
   
   
   def parse_args() -> argparse.Namespace:
       p = argparse.ArgumentParser(description="EMR S3 Tables VARIANT shred bug 
repro.")
       p.add_argument(
           "--case",
           choices=("all", "minimal", "wide"),
           default="all",
           help="minimal=PASS control, wide=FAIL repro (default: all)",
       )
       p.add_argument(
           "--no-shred",
           action="store_true",
           help="Set spark.sql.iceberg.shred-variants=false (wide should pass)",
       )
       p.add_argument("--skip-delete", action="store_true")
       p.add_argument("--catalog", default=CATALOG)
       p.add_argument("--namespace", default=NAMESPACE)
       p.add_argument("--bucket-arn", default=TABLE_BUCKET_ARN)
       p.add_argument("--region", default=AWS_REGION)
       return p.parse_args()
   
   
   def _table_exists(client, bucket_arn: str, namespace: str, name: str) -> 
bool:
       try:
           client.get_table(
               tableBucketARN=bucket_arn, namespace=namespace, name=name
           )
           return True
       except ClientError as exc:
           code = exc.response.get("Error", {}).get("Code", "")
           if code in {"NotFoundException", "ResourceNotFoundException"}:
               return False
           raise
   
   
   def delete_table_if_exists(
       name: str, bucket_arn: str, namespace: str, region: str
   ) -> None:
       client = boto3.client("s3tables", region_name=region)
       if not _table_exists(client, bucket_arn, namespace, name):
           return
       client.delete_table(
           tableBucketARN=bucket_arn, namespace=namespace, name=name
       )
       print(f"Delete requested: {namespace}.{name}")
       deadline = time.time() + DELETE_TIMEOUT_SEC
       while time.time() < deadline:
           if not _table_exists(client, bucket_arn, namespace, name):
               print(f"Confirmed gone: {namespace}.{name}")
               return
           time.sleep(DELETE_POLL_SEC)
       raise TimeoutError(f"Table {namespace}.{name} still exists after delete")
   
   
   def sql_create_minimal(catalog: str, namespace: str, table: str) -> str:
       return f"""
       CREATE TABLE `{catalog}`.`{namespace}`.`{table}` (
           id STRING,
           payload_a VARIANT,
           payload_b VARIANT
       ) USING iceberg
       PARTITIONED BY (bucket(16, id))
       TBLPROPERTIES ('format-version' = '3')
       """
   
   
   def sql_create_wide(catalog: str, namespace: str, table: str) -> str:
       return f"""
       CREATE TABLE `{catalog}`.`{namespace}`.`{table}` (
           id STRING,
           region STRING,
           user_key STRING,
           email STRING,
           payload_a VARIANT,
           payload_b VARIANT,
           updated_at TIMESTAMP,
           status STRING,
           hash_key STRING,
           extra_a VARIANT,
           extra_b VARIANT,
           extra_c VARIANT,
           extra_d VARIANT,
           extra_e VARIANT,
           extra_f VARIANT,
           extra_g VARIANT,
           id_list ARRAY<STRING>,
           extra_h VARIANT,
           extra_i VARIANT,
           tag_list ARRAY<STRING>
       ) USING iceberg
       PARTITIONED BY (bucket(16, id))
       TBLPROPERTIES ('format-version' = '3')
       """
   
   
   def sql_insert_minimal(catalog: str, namespace: str, table: str) -> str:
       return f"""
       INSERT OVERWRITE `{catalog}`.`{namespace}`.`{table}` (id, payload_a, 
payload_b)
       SELECT id, parse_json(payload_a), parse_json(payload_b)
       FROM {SOURCE_VIEW}
       """
   
   
   def sql_insert_wide(catalog: str, namespace: str, table: str) -> str:
       return f"""
       INSERT OVERWRITE `{catalog}`.`{namespace}`.`{table}` (
           id, region, user_key, email, payload_a, payload_b, updated_at,
           status, hash_key, extra_a, extra_b, extra_c, extra_d, extra_e,
           extra_f, extra_g, id_list, extra_h, extra_i, tag_list
       )
       SELECT
           id,
           region,
           user_key,
           email,
           parse_json(payload_a),
           parse_json(payload_b),
           COALESCE(updated_at, CURRENT_TIMESTAMP()),
           status,
           hash_key,
           parse_json(extra_a),
           parse_json(extra_b),
           parse_json(extra_c),
           parse_json(extra_d),
           parse_json(extra_e),
           parse_json(extra_f),
           parse_json(extra_g),
           CASE
               WHEN id_list = '[]' OR id_list IS NULL THEN array()
               ELSE from_json(id_list, 'ARRAY<STRING>')
           END,
           parse_json(extra_h),
           parse_json(extra_i),
           CASE
               WHEN tag_list = '[]' OR tag_list IS NULL THEN array()
               ELSE from_json(tag_list, 'ARRAY<STRING>')
           END
       FROM {SOURCE_VIEW}
       """
   
   
   def _wide_row(row_id: str) -> tuple[str, ...]:
       return (
           row_id,
           "us-east-1",
           "user-100",
           "[email protected]",
           JSON_A,
           JSON_B,
           None,
           "active",
           "deadbeef",
           JSON_SIMPLE,
           JSON_SIMPLE,
           JSON_SIMPLE,
           JSON_SIMPLE,
           JSON_SIMPLE,
           JSON_SIMPLE,
           JSON_SIMPLE,
           "[]",
           JSON_SIMPLE,
           JSON_SIMPLE,
           "[]",
       )
   
   
   def _is_shred_bug(exc: BaseException) -> bool:
       msg = str(exc)
       return "ClassCastException" in msg and "is not a group" in msg
   
   
   def _prepare_table(
       spark: SparkSession,
       args: argparse.Namespace,
       table: str,
       create_sql: str,
   ) -> None:
       if not args.skip_delete:
           delete_table_if_exists(
               table, args.bucket_arn, args.namespace, args.region
           )
       spark.catalog.clearCache()
       spark.sql(f"CREATE NAMESPACE IF NOT EXISTS 
`{args.catalog}`.`{args.namespace}`")
       spark.sql(create_sql)
   
   
   def run_minimal(spark: SparkSession, args: argparse.Namespace) -> None:
       table = "shred_bug_minimal"
       print(f"\n=== minimal (id + 2 VARIANT) -> table {args.namespace}.{table} 
===")
       _prepare_table(
           spark, args, table, sql_create_minimal(args.catalog, args.namespace, 
table)
       )
       rows = [("row-1", JSON_A, JSON_B), ("row-2", JSON_A, JSON_B)]
       spark.createDataFrame(rows, ["id", "payload_a", 
"payload_b"]).createOrReplaceTempView(
           SOURCE_VIEW
       )
       spark.sql(sql_insert_minimal(args.catalog, args.namespace, table))
       n = spark.table(f"{args.catalog}.{args.namespace}.{table}").count()
       print(f"OK: inserted {n} rows (expected PASS)")
   
   
   def run_wide(spark: SparkSession, args: argparse.Namespace) -> None:
       table = "shred_bug_wide"
       shred = not args.no_shred
       print(
           f"\n=== wide mixed schema -> table {args.namespace}.{table} "
           f"(shred={shred}, expected {'FAIL' if shred else 'PASS'}) ==="
       )
       _prepare_table(
           spark, args, table, sql_create_wide(args.catalog, args.namespace, 
table)
       )
       rows = [_wide_row("row-1"), _wide_row("row-2")]
       spark.createDataFrame(rows, WIDE_SOURCE_SCHEMA).createOrReplaceTempView(
           SOURCE_VIEW
       )
       spark.sql(sql_insert_wide(args.catalog, args.namespace, table))
       n = spark.table(f"{args.catalog}.{args.namespace}.{table}").count()
       print(f"OK: inserted {n} rows")
   
   
   def main() -> int:
       args = parse_args()
       shred = not args.no_shred
   
       print("EMR S3 Tables VARIANT shred bug repro")
       print(f"  catalog={args.catalog}  namespace={args.namespace}")
       print(f"  bucket={args.bucket_arn}")
       print(f"  spark.sql.iceberg.shred-variants={shred}")
   
       spark = 
SparkSession.builder.appName("emr_s3tables_shred_repro").getOrCreate()
       spark.sparkContext.setLogLevel("WARN")
       spark.conf.set("spark.sql.iceberg.shred-variants", str(shred).lower())
       print(f"  Spark version: {spark.version}")
   
       wide_result = "PASS"
       try:
           if args.case in ("all", "minimal"):
               run_minimal(spark, args)
   
           if args.case in ("all", "wide"):
               try:
                   run_wide(spark, args)
               except Exception as exc:
                   if _is_shred_bug(exc):
                       wide_result = "FAIL"
                       print(f"BUG REPRODUCED: {str(exc).split(chr(10))[0]}")
                   else:
                       wide_result = "ERROR"
                       print(f"ERROR: {exc}")
                       traceback.print_exc(limit=10)
                       return 1
       finally:
           spark.catalog.dropTempView(SOURCE_VIEW)
           spark.stop()
   
       print("\n=== SUMMARY ===")
       if args.case in ("all", "minimal"):
           print("  minimal: PASS (expected PASS)")
       if args.case in ("all", "wide"):
           expected = "FAIL" if shred else "PASS"
           ok = wide_result == expected
           print(f"  wide:    {wide_result} (expected {expected})  {'OK' if ok 
else 'UNEXPECTED'}")
           if wide_result == "FAIL" and shred:
               print("\nBug reproduced on S3 Tables + EMR.")
           if not ok:
               return 1
       return 0
   
   
   if __name__ == "__main__":
       raise SystemExit(main())
   
   ```
   
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [ ] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [x] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to