aokolnychyi commented on code in PR #6012:
URL: https://github.com/apache/iceberg/pull/6012#discussion_r1034279260


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+
+public class GenerateChangesProcedure extends BaseProcedure {

Review Comment:
   Can we add some Javadoc on what this procedure does?
   May make sense to add a few details about the algorithm too.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+
+public class GenerateChangesProcedure extends BaseProcedure {
+
+  public static SparkProcedures.ProcedureBuilder builder() {

Review Comment:
   nit: The definition order in other procedures: static vars, static methods, 
constructor, instance methods. Could you move this method below the static 
constants? 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+
+public class GenerateChangesProcedure extends BaseProcedure {
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<GenerateChangesProcedure>() {
+      @Override
+      protected GenerateChangesProcedure doBuild() {
+        return new GenerateChangesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {

Review Comment:
   I'd consider supporting these parameters (names TBD).
   - Table name
   - Changelog view name
   - A map of options that would also contain boundaries
   - A boolean to indicate whether we should compute pre/post images
   - A boolean to indicate whether we should remove carried over records
   - Default identifier columns to use if not defined in snapshot's schema



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.UUID;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.expressions.Window;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+
+public class GenerateChangesProcedure extends BaseProcedure {
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<GenerateChangesProcedure>() {
+      @Override
+      protected GenerateChangesProcedure doBuild() {
+        return new GenerateChangesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        // the snapshot ids input are ignored when the start/end timestamps 
are provided
+        ProcedureParameter.optional("start_snapshot_id_exclusive", 
DataTypes.LongType),
+        ProcedureParameter.optional("end_snapshot_id_inclusive", 
DataTypes.LongType),
+        ProcedureParameter.optional("table_change_view", DataTypes.StringType),
+        ProcedureParameter.optional("identifier_columns", 
DataTypes.StringType),

Review Comment:
   Each snapshot's schema has a list of identity columns. I feel asking users 
for identity columns when those are already defined is not necessary. Let me 
think how we can handle this without too much complication.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.UUID;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.expressions.Window;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+
+public class GenerateChangesProcedure extends BaseProcedure {
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<GenerateChangesProcedure>() {
+      @Override
+      protected GenerateChangesProcedure doBuild() {
+        return new GenerateChangesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        // the snapshot ids input are ignored when the start/end timestamps 
are provided
+        ProcedureParameter.optional("start_snapshot_id_exclusive", 
DataTypes.LongType),
+        ProcedureParameter.optional("end_snapshot_id_inclusive", 
DataTypes.LongType),
+        ProcedureParameter.optional("table_change_view", DataTypes.StringType),
+        ProcedureParameter.optional("identifier_columns", 
DataTypes.StringType),

Review Comment:
   I don't think operation conditions should matter.
   
   Suppose I have the following table with a primary key column `pk`.
   
   ```
   -----------------
   pk | dep | salary
   -----------------
   1, hr, 100
   2, software, 80
   3, hardware, 120
   4, software, 110
   5, hr, 95
   6, software, 100
   ```
   
   If I issue `UPDATE t SET salary = 110 WHERE salary = 100`, it will generate 
the following changes.
   
   ```
   --------------------------------------------------------------------------
   _change_type | _change_ordinal | _commit_snapshot_id | pk | dep | salary
   --------------------------------------------------------------------------
   DELETE, 0, s1, 1, hr, 100
   INSERT, 0, s1, 1, hr, 110
   DELETE, 0, s1, 6, software, 100
   INSERT, 0, s1, 6, software, 110
   ```
   
   Even though I had a condition on `salary`, we should use `pk` for computing 
pre/post images.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java:
##########
@@ -53,6 +53,7 @@ private static Map<String, Supplier<ProcedureBuilder>> 
initProcedureBuilders() {
     mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
+    mapBuilder.put("generate_changes", GenerateChangesProcedure::builder);

Review Comment:
   Are there any alternative names? I am not sure the procedure actually 
generates changes.
   Let's think a bit. It is not bad but I wonder whether we can be a bit more 
specific.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+
+public class GenerateChangesProcedure extends BaseProcedure {
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<GenerateChangesProcedure>() {
+      @Override
+      protected GenerateChangesProcedure doBuild() {
+        return new GenerateChangesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        // the snapshot ids input are ignored when the start/end timestamps 
are provided
+        ProcedureParameter.optional("start_snapshot_id_exclusive", 
DataTypes.LongType),
+        ProcedureParameter.optional("end_snapshot_id_inclusive", 
DataTypes.LongType),
+        ProcedureParameter.optional("table_change_view", DataTypes.StringType),
+        ProcedureParameter.optional("identifier_columns", 
DataTypes.StringType),
+        ProcedureParameter.optional("start_timestamp", 
DataTypes.TimestampType),
+        ProcedureParameter.optional("end_timestamp", DataTypes.TimestampType),
+      };
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("view_name", DataTypes.StringType, false, 
Metadata.empty())
+          });
+
+  private GenerateChangesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    String tableName = args.getString(0);
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(tableName, args);
+
+    // Compute the pre-image and post-images if the identifier columns are 
provided.
+    if (!args.isNullAt(4)) {
+      String[] identifierColumns = args.getString(4).split(",");
+      if (identifierColumns.length > 0) {
+        df = withUpdate(df, identifierColumns);
+      }
+    }
+
+    String viewName = viewName(args, tableName);
+
+    // Create a view for users to query
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, InternalRow args) {
+    Long[] snapshotIds = getSnapshotIds(tableName, args);
+
+    // we don't have to validate the snapshot ids here because the reader will 
do it for us.
+    DataFrameReader reader = spark().read();
+    if (snapshotIds[0] != null) {
+      reader = reader.option(SparkReadOptions.START_SNAPSHOT_ID, 
snapshotIds[0]);
+    }
+
+    if (snapshotIds[1] != null) {
+      reader = reader.option(SparkReadOptions.END_SNAPSHOT_ID, snapshotIds[1]);
+    }
+
+    return reader.table(tableName + "." + SparkChangelogTable.TABLE_NAME);
+  }
+
+  @NotNull
+  private Long[] getSnapshotIds(String tableName, InternalRow args) {
+    Long[] snapshotIds = new Long[] {null, null};
+
+    Long startTimestamp = args.isNullAt(5) ? null : 
DateTimeUtil.microsToMillis(args.getLong(5));
+    Long endTimestamp = args.isNullAt(6) ? null : 
DateTimeUtil.microsToMillis(args.getLong(6));
+
+    if (startTimestamp == null && endTimestamp == null) {
+      snapshotIds[0] = args.isNullAt(1) ? null : args.getLong(1);
+      snapshotIds[1] = args.isNullAt(2) ? null : args.getLong(2);
+    } else {
+      Identifier tableIdent = toIdentifier(tableName, PARAMETERS[0].name());
+      Table table = loadSparkTable(tableIdent).table();
+      Snapshot[] snapshots = snapshotsFromTimestamp(startTimestamp, 
endTimestamp, table);
+
+      if (snapshots != null) {
+        snapshotIds[0] = snapshots[0].parentId();
+        snapshotIds[1] = snapshots[1].snapshotId();
+      }
+    }
+    return snapshotIds;
+  }
+
+  private Snapshot[] snapshotsFromTimestamp(Long startTimestamp, Long 
endTimestamp, Table table) {
+    Snapshot[] snapshots = new Snapshot[] {null, null};
+
+    if (startTimestamp != null && endTimestamp != null && startTimestamp > 
endTimestamp) {
+      throw new IllegalArgumentException(
+          "Start timestamp must be less than or equal to end timestamp");
+    }
+
+    if (startTimestamp == null) {
+      snapshots[0] = SnapshotUtil.oldestAncestor(table);
+    } else {
+      snapshots[0] = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
+    }
+
+    if (endTimestamp == null) {
+      snapshots[1] = table.currentSnapshot();
+    } else {
+      snapshots[1] = table.snapshot(SnapshotUtil.snapshotIdAsOfTime(table, 
endTimestamp));
+    }
+
+    if (snapshots[0] == null || snapshots[1] == null) {
+      return null;
+    }
+
+    return snapshots;
+  }
+
+  @NotNull
+  private static String viewName(InternalRow args, String tableName) {
+    String viewName = args.isNullAt(3) ? null : args.getString(3);
+    if (viewName == null) {
+      String shortTableName =
+          tableName.contains(".") ? 
tableName.substring(tableName.lastIndexOf(".") + 1) : tableName;
+      viewName = shortTableName + "_changes";
+    }
+    return viewName;
+  }
+
+  private Dataset<Row> withUpdate(Dataset<Row> df, String[] identifiers) {
+    Column[] partitionSpec = getPartitionSpec(df, identifiers);
+    Column[] sortSpec = sortSpec(df, partitionSpec);
+
+    int changeTypeIdx = 
df.schema().fieldIndex(MetadataColumns.CHANGE_TYPE.name());
+    List<Integer> partitionIdx =
+        Arrays.stream(partitionSpec)
+            .map(column -> df.schema().fieldIndex(column.toString()))
+            .collect(Collectors.toList());
+
+    return df.repartition(partitionSpec)
+        .sortWithinPartitions(sortSpec)
+        .mapPartitions(
+            processRowsWithinTask(changeTypeIdx, partitionIdx), 
RowEncoder.apply(df.schema()));
+  }
+
+  @NotNull
+  private static Column[] getPartitionSpec(Dataset<Row> df, String[] 
identifiers) {
+    Column[] partitionSpec = new Column[identifiers.length + 1];
+    for (int i = 0; i < identifiers.length; i++) {
+      try {
+        partitionSpec[i] = df.col(identifiers[i]);
+      } catch (Exception e) {
+        throw new IllegalArgumentException(
+            String.format("Identifier column '%s' does not exist in the 
table", identifiers[i]), e);
+      }
+    }
+    partitionSpec[partitionSpec.length - 1] = 
df.col(MetadataColumns.CHANGE_ORDINAL.name());
+    return partitionSpec;
+  }
+
+  @NotNull
+  private static Column[] sortSpec(Dataset<Row> df, Column[] partitionSpec) {
+    Column[] sortSpec = new Column[partitionSpec.length + 1];
+    System.arraycopy(partitionSpec, 0, sortSpec, 0, partitionSpec.length);
+    sortSpec[sortSpec.length - 1] = df.col(MetadataColumns.CHANGE_TYPE.name());
+    return sortSpec;
+  }
+
+  private static MapPartitionsFunction<Row, Row> processRowsWithinTask(

Review Comment:
   I'll take a look at this with fresh eyes tomorrow. It would be nice to 
decouple this into a class so that we can unit test by explicitly passing an 
iterator of rows.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.UUID;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.expressions.Window;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+
+public class GenerateChangesProcedure extends BaseProcedure {
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<GenerateChangesProcedure>() {
+      @Override
+      protected GenerateChangesProcedure doBuild() {
+        return new GenerateChangesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        // the snapshot ids input are ignored when the start/end timestamps 
are provided
+        ProcedureParameter.optional("start_snapshot_id_exclusive", 
DataTypes.LongType),
+        ProcedureParameter.optional("end_snapshot_id_inclusive", 
DataTypes.LongType),
+        ProcedureParameter.optional("table_change_view", DataTypes.StringType),
+        ProcedureParameter.optional("identifier_columns", 
DataTypes.StringType),
+        ProcedureParameter.optional("start_timestamp", 
DataTypes.TimestampType),
+        ProcedureParameter.optional("end_timestamp", DataTypes.TimestampType),

Review Comment:
   I am not sure. I'd consider having `read_options` or `options` as a map that 
would be passed while loading deletes and inserts as `DataFrame`. Then users 
can specify boundaries directly in the map.
   
   We already respect these options from `SparkReadOptions` in the `changes` 
table:
   
   ```
   // Start snapshot ID used in incremental scans (exclusive)
   public static final String START_SNAPSHOT_ID = "start-snapshot-id";
   
   // End snapshot ID used in incremental scans (inclusive)
   public static final String END_SNAPSHOT_ID = "end-snapshot-id";
   ```
   
   We could add `start-timestamp` and `end-timestamp`, 
`start-snapshot-id-inclusive`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to