amenck opened a new issue, #16510:
URL: https://github.com/apache/iceberg/issues/16510
### Apache Iceberg version
1.11.0 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
Spark resolves filter predicate column names against the **current** table
schema instead of the snapshot's schema during time-travel reads. This causes
`ValidationException: Cannot find field '<old_name>'` when filtering on a
column that was renamed after the target snapshot.
In the repro below, you can see that `SELECT` uses the snapshot's schema and
succeeds--however, when we add a filter it tries to apply the new schema and
starts failing.
```
import os
import urllib.request
from pyspark.sql import SparkSession, functions as F
ICEBERG_VERSION = "1.11.0"
JAR_NAME = f"iceberg-spark-runtime-3.5_2.12-{ICEBERG_VERSION}.jar"
JAR_PATH = os.path.join("/tmp", JAR_NAME)
if not os.path.exists(JAR_PATH):
urllib.request.urlretrieve(
f"https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/{ICEBERG_VERSION}/{JAR_NAME}",
JAR_PATH,
)
ICEBERG_FORMAT = "org.apache.iceberg.spark.source.IcebergSource"
spark = (
SparkSession.builder
.master("local[1]")
.config("spark.jars", JAR_PATH)
.config("spark.sql.catalog.local",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
)
TABLE = "local.db.repro"
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.db")
spark.sql(f"DROP TABLE IF EXISTS {TABLE}")
# Create table with column "col" and insert data
spark.sql(f"CREATE TABLE {TABLE} (id BIGINT, col DOUBLE) USING iceberg")
spark.sql(f"INSERT INTO {TABLE} VALUES (1, 100.0), (2, 200.0), (3, 0.0)")
# Capture snapshot
snapshot_v1 = spark.sql(f"SELECT snapshot_id FROM
{TABLE}.snapshots").collect()[-1].snapshot_id
# Rename column and insert more data
spark.sql(f"ALTER TABLE {TABLE} RENAME COLUMN col TO value")
spark.sql(f"INSERT INTO {TABLE} VALUES (4, 400.0)")
# Time-travel read via IcebergSource
df = spark.read.format(ICEBERG_FORMAT).option("snapshot-id",
snapshot_v1).load(TABLE)
df.columns # => ['id', 'col'] -- correct, uses snapshot schema
df.select("col") # => works fine
df.count() # => 3, works fine
# BUG: filter resolves "col" against current schema (which has "value", not
"col")
df.filter(F.col("col") > 0).count()
# => ValidationException: Cannot find field 'col' in struct:
# struct<1: id: optional long, 2: value: optional double>
```
## Expected behavior
`df.filter(F.col("col") > 0)` should resolve `col` against the snapshot's
schema (where the column is named `col`) and succeed, consistent with how
`df.select("col")` and `df.columns` already work.
## Actual behavior
```
org.apache.iceberg.exceptions.ValidationException: Cannot find field 'col'
in struct:
struct<1: id: optional long, 2: value: optional double>
```
The error message shows the **current** schema (with `value`), not the
snapshot's schema (with `col`).
## Environment
- Iceberg: 1.11.0
- Spark: 3.5.5
- Catalog: Hadoop
Initially saw this happening in EMR with an AWS Glue catalog, but was able
to get the minimal repro above.
### Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [ ] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]