amogh-jahagirdar commented on code in PR #12736: URL: https://github.com/apache/iceberg/pull/12736#discussion_r2045124799
########## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala: ########## @@ -148,9 +151,29 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case UnsetViewProperties(ResolvedV2View(catalog, ident), propertyKeys, ifExists) => AlterV2ViewUnsetPropertiesExec(catalog, ident, propertyKeys, ifExists) :: Nil + case ReplaceData(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, _, Some(write)) => + ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil + + case WriteDelta(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, projections, + Some(write)) => + WriteDeltaExec(planLater(query), refreshCache(r), projections, write) :: Nil + case _ => Nil } + /** + * In case of a table enabled with row lineage, the Dsv2 relation output for the write will have + * row lineage in the outputs. However, the cached Dsv2 relation won't have these fields + * so refresh after a write should be performed with an updated plan with row lineage outputs removed. + * @param r + */ + private def refreshCache(r: DataSourceV2Relation)(): Unit = { + val planToCacheBy = r.copy(output = r.output.filterNot( + attr => attr.name == MetadataColumns.ROW_ID.name() || + attr.name == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())) + spark.sharedState.cacheManager.recacheByPlan(spark, planToCacheBy) + } + Review Comment: The only uncertainty is on adding new optimizer rules, but again adding a single optimizer rule for this case seems a lot less intrusive than manipulating the physical execution strategy ########## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala: ########## @@ -148,9 +151,29 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case UnsetViewProperties(ResolvedV2View(catalog, ident), propertyKeys, ifExists) => AlterV2ViewUnsetPropertiesExec(catalog, ident, propertyKeys, ifExists) :: Nil + case ReplaceData(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, _, Some(write)) => + ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil + + case WriteDelta(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, projections, + Some(write)) => + WriteDeltaExec(planLater(query), refreshCache(r), projections, write) :: Nil + case _ => Nil } + /** + * In case of a table enabled with row lineage, the Dsv2 relation output for the write will have + * row lineage in the outputs. However, the cached Dsv2 relation won't have these fields + * so refresh after a write should be performed with an updated plan with row lineage outputs removed. + * @param r + */ + private def refreshCache(r: DataSourceV2Relation)(): Unit = { + val planToCacheBy = r.copy(output = r.output.filterNot( + attr => attr.name == MetadataColumns.ROW_ID.name() || + attr.name == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())) + spark.sharedState.cacheManager.recacheByPlan(spark, planToCacheBy) + } + Review Comment: The only uncertainty is on adding new optimizer rules, but again adding a single optimizer rule for this case seems a lot less intrusive than manipulating the physical execution strategy like what's being done here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org