bennychow commented on code in PR #9830: URL: https://github.com/apache/iceberg/pull/9830#discussion_r1537072217
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java: ########## @@ -557,6 +568,88 @@ public View loadView(Identifier ident) throws NoSuchViewException { throw new NoSuchViewException(ident); } + // Candidate to be moved to org.apache.iceberg.view.View + private boolean isMaterializedView(org.apache.iceberg.view.View view) { + return Optional.of(view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY)) + .orElse("false") + .equals("true"); + } + + // Candidate to be moved to org.apache.iceberg.view.View + private String getStorageTableIdentifier(org.apache.iceberg.view.View view) { + String identifier = + view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY); + Preconditions.checkState( + identifier != null, "Storage table identifier is not set for materialized view."); + return identifier; + } + + // Candidate to be moved to org.apache.iceberg.view.View but requires loadTable + private Table loadStorageTable(org.apache.iceberg.view.View view) { + String storageTableIdentifier = getStorageTableIdentifier(view); + try { + SparkSession session = SparkSession.active(); + Table storageTable = + loadTable(Spark3Util.catalogAndIdentifier(session, storageTableIdentifier).identifier()); + return storageTable; + } catch (ParseException | NoSuchTableException e) { + throw new IllegalStateException("Unable to load storage table for materialized view.", e); + } + } + + // Candidate to be moved to org.apache.iceberg.view.View but requires loadTable + // Second option is to move to SparkMaterializedView + private boolean isFresh(org.apache.iceberg.view.View view) { + Table storageTable = loadStorageTable(view); + Map<String, String> storageTableProperties = storageTable.properties(); + + // Get the parent view version id from the storage table properties + String storageTableViewVersionIdPropertyValue = + storageTableProperties.get( + MaterializedViewUtil.MATERIALIZED_VIEW_VERSION_PROPERTY_KEY); + if (storageTableViewVersionIdPropertyValue == null) { + throw new IllegalStateException( + "Storage table properties do not contain the virtual view version id property."); + } + int storageTableViewVersionId = Integer.parseInt(storageTableViewVersionIdPropertyValue); + + // If the storage table view version id is different from the current version id, the + // materialized view is not fresh + if (storageTableViewVersionId != view.currentVersion().versionId()) { + return false; + } + + // Get the base table snapshot ids from the storage table properties + Map<String, String> baseTableSnapshotsProperties = + storageTableProperties.entrySet().stream() + .filter( + entry -> + entry + .getKey() + .startsWith( + MaterializedViewUtil + .MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + List<Table> baseTables = MaterializedViewUtil.extractBaseTables(view.sqlFor("spark").sql()); + + for (Table baseTable : baseTables) { + org.apache.iceberg.Table icebergBaseTable = ((SparkTable) baseTable).table(); + String snapshotId = + String.valueOf( + icebergBaseTable.currentSnapshot() == null + ? 0 + : icebergBaseTable.currentSnapshot().snapshotId()); + if (!baseTableSnapshotsProperties Review Comment: This could also be optimized a bit to not count base table snapshot changes where the DataOperations is REPLACE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org