amogh-jahagirdar commented on code in PR #15240:
URL: https://github.com/apache/iceberg/pull/15240#discussion_r2879433992
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java:
##########
@@ -74,24 +73,7 @@ public Set<TableCapability> capabilities() {
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
- if (refreshEagerly) {
- icebergTable.refresh();
- }
-
- return new SparkScanBuilder(spark(), icebergTable, changelogSchema(),
options) {
- @Override
- public Scan build() {
- return buildChangelogScan();
- }
- };
- }
-
- private Schema changelogSchema() {
- if (lazyChangelogSchema == null) {
- this.lazyChangelogSchema =
ChangelogUtil.changelogSchema(icebergTable.schema());
- }
-
- return lazyChangelogSchema;
+ return new SparkChangelogScanBuilder(spark(), table, schema, options);
Review Comment:
Yeah I think the separate SparkChangelogScanBuilder makes sense and is a
clean change for the current model.
##########
spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCachedTableRefresh.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestCachedTableRefresh extends ExtensionsTestBase {
+
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+ protected static Object[][] parameters() {
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ ImmutableMap.<String, String>builder()
+ .putAll(SparkCatalogConfig.HIVE.properties())
+ .put("cache-enabled", "false")
+ .build()
+ },
+ {
+ SparkCatalogConfig.SPARK_SESSION.catalogName(),
+ SparkCatalogConfig.SPARK_SESSION.implementation(),
+ ImmutableMap.<String, String>builder()
+ .putAll(SparkCatalogConfig.SPARK_SESSION.properties())
+ .put("cache-enabled", "true")
+ .put("cache.expiration-interval-ms", "-1") // indefinite cache
+ .buildKeepingLast()
+ }
+ };
+ }
+
+ @AfterEach
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("UNCACHE TABLE IF EXISTS %s", tableName);
+ }
+
+ @TestTemplate
+ public void testCachedTableWithExternalWrite() throws IOException {
+ // create table and insert initial data
+ sql("CREATE TABLE %s (id INT, salary INT) USING iceberg", tableName);
+ sql("INSERT INTO %s VALUES (1, 100)", tableName);
+
+ // cache table
+ sql("CACHE TABLE %s", tableName);
+
+ // query table to populate cache
+ List<Object[]> result1 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result1).hasSize(1).containsExactly(row(1, 100));
+
+ // external writer adds (2, 200)
+ Table table = validationCatalog.loadTable(tableIdent);
+ Record record = GenericRecord.create(table.schema());
+ record.setField("id", 2);
+ record.setField("salary", 200);
+ DataFile dataFile = writeData(table, ImmutableList.of(record));
+ table.newAppend().appendFile(dataFile).commit();
+
+ // query table again - should return cached data, not new data
+ List<Object[]> result2 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result2)
+ .as("Cached table should not reflect external writes")
+ .hasSize(1)
+ .containsExactly(row(1, 100));
+ }
+
+ @TestTemplate
+ public void testCachedTableWithSessionWrite() {
+ // create table and insert initial data
+ sql("CREATE TABLE %s (id INT, salary INT) USING iceberg", tableName);
+ sql("INSERT INTO %s VALUES (1, 100)", tableName);
+
+ // cache table
+ sql("CACHE TABLE %s", tableName);
+
+ // query table to populate cache
+ List<Object[]> result1 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result1).hasSize(1).containsExactly(row(1, 100));
+
+ // session write adds (2, 200) - should invalidate cache
+ sql("INSERT INTO %s VALUES (2, 200)", tableName);
+
+ // query table again - should reflect session writes
+ List<Object[]> result2 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result2)
+ .as("Cached table should reflect session writes")
+ .hasSize(2)
+ .containsExactly(row(1, 100), row(2, 200));
+ }
+
+ @TestTemplate
+ public void testCachedTableWithSessionWriteAndExternalWrite() throws
IOException {
+ // create table and insert initial data
+ sql("CREATE TABLE %s (id INT, salary INT) USING iceberg", tableName);
+ sql("INSERT INTO %s VALUES (1, 100)", tableName);
+
+ // cache table
+ sql("CACHE TABLE %s", tableName);
+
+ // query table to populate cache
+ List<Object[]> result1 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result1).hasSize(1).containsExactly(row(1, 100));
+
+ // session write invalidates cache
+ sql("INSERT INTO %s VALUES (2, 200)", tableName);
+
+ // external writer adds (3, 300)
+ Table table = validationCatalog.loadTable(tableIdent);
+ Record record = GenericRecord.create(table.schema());
+ record.setField("id", 3);
+ record.setField("salary", 300);
+ DataFile dataFile = writeData(table, ImmutableList.of(record));
+ table.newAppend().appendFile(dataFile).commit();
+
+ // query table again - should see session write but not external write
+ List<Object[]> result2 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result2)
+ .as("Cached table should reflect session writes but not external
writes after refresh")
+ .hasSize(2)
+ .containsExactly(row(1, 100), row(2, 200));
+ }
+
+ @TestTemplate
+ public void testCachedTableWithExternalSchemaChangeAddColumn() throws
IOException {
+ // create table and insert initial data
+ sql("CREATE TABLE %s (id INT, salary INT) USING iceberg", tableName);
+ sql("INSERT INTO %s VALUES (1, 100)", tableName);
+
+ // cache table
+ sql("CACHE TABLE %s", tableName);
+
+ // query table to populate cache
+ List<Object[]> result1 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result1).hasSize(1).containsExactly(row(1, 100));
+
+ // external writer adds column and data
+ Table table = validationCatalog.loadTable(tableIdent);
+ table.updateSchema().addColumn("new_column",
Types.IntegerType.get()).commit();
+
+ Record record = GenericRecord.create(table.schema());
+ record.setField("id", 2);
+ record.setField("salary", 200);
+ record.setField("new_column", -1);
+ DataFile dataFile = writeData(table, ImmutableList.of(record));
+ table.newAppend().appendFile(dataFile).commit();
+
+ // query table again - should return cached data with original schema
+ List<Object[]> result2 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result2)
+ .as("Cached table should pin metadata and not reflect external schema
changes")
+ .hasSize(1)
+ .containsExactly(row(1, 100));
+ }
+
+ @TestTemplate
+ public void testCachedTableWithSessionSchemaChangeAddColumn() {
+ // create table and insert initial data
+ sql("CREATE TABLE %s (id INT, salary INT) USING iceberg", tableName);
+ sql("INSERT INTO %s VALUES (1, 100)", tableName);
+
+ // cache table
+ sql("CACHE TABLE %s", tableName);
+
+ // query table to populate cache
+ List<Object[]> result1 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result1).hasSize(1).containsExactly(row(1, 100));
+
+ // session DDL adds column - should invalidate cache
+ sql("ALTER TABLE %s ADD COLUMN new_column INT", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, -1)", tableName);
+
+ // query table again - should reflect session schema changes
+ List<Object[]> result2 = sql("SELECT * FROM %s ORDER BY id", tableName);
+ assertThat(result2)
+ .as("Cached table should reflect session schema changes")
+ .hasSize(2)
+ .containsExactly(row(1, 100, null), row(2, 200, -1));
+ }
+
+ @TestTemplate
+ @Disabled("https://issues.apache.org/jira/browse/SPARK-55631")
Review Comment:
Looks like this is fixed in 4.2, shall we comment that inline?
##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java:
##########
@@ -166,6 +167,7 @@ public void testSparkStreamingWriteFailsUnknownTransform()
throws IOException, T
.hasMessageContaining("Cannot write using unsupported transforms:
zero");
}
+ @Disabled("https://issues.apache.org/jira/browse/SPARK-55626")
Review Comment:
Same as the other one, worth commenting that this will be fixed in the 4.2.0
release?
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -23,8 +23,14 @@ public class SparkReadOptions {
private SparkReadOptions() {}
- // Snapshot ID of the table snapshot to read
- public static final String SNAPSHOT_ID = "snapshot-id";
+ // legacy time travel option that is no longer supported
+ public static final String LEGACY_SNAPSHOT_ID = "snapshot-id";
Review Comment:
I don't think we have the same guarantees in the Spark module that we have
in API/Core. I'd actually prefer the rename to indicate that it's "legacy"
because the naming indicates that it is not supported, since the time travel
options being exposed are being simplified in the 4.1 model
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -23,8 +23,14 @@ public class SparkReadOptions {
private SparkReadOptions() {}
- // Snapshot ID of the table snapshot to read
- public static final String SNAPSHOT_ID = "snapshot-id";
+ // legacy time travel option that is no longer supported
+ public static final String LEGACY_SNAPSHOT_ID = "snapshot-id";
Review Comment:
I'd even go stronger on the naming, and say DEPRECATED_SNAPSHOT_ID.
##########
spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java:
##########
@@ -69,4 +81,31 @@ public static void startMetastoreAndSpark() {
CatalogUtil.loadCatalog(
HiveCatalog.class.getName(), "hive", ImmutableMap.of(),
hiveConf);
}
+
+ protected boolean cacheEnabled() {
Review Comment:
Agree with @manuzhang here, think there's already a util in the base class
we can reuse, and that doesn't look to be resolving this differently
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]