aokolnychyi commented on code in PR #6012: URL: https://github.com/apache/iceberg/pull/6012#discussion_r1034279260
########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.iceberg.ChangelogOperation; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.jetbrains.annotations.NotNull; + +public class GenerateChangesProcedure extends BaseProcedure { Review Comment: Can we add some Javadoc on what this procedure does? May make sense to add a few details about the algorithm too. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.iceberg.ChangelogOperation; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.jetbrains.annotations.NotNull; + +public class GenerateChangesProcedure extends BaseProcedure { + + public static SparkProcedures.ProcedureBuilder builder() { Review Comment: nit: The definition order in other procedures: static vars, static methods, constructor, instance methods. Could you move this method below the static constants? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.iceberg.ChangelogOperation; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.jetbrains.annotations.NotNull; + +public class GenerateChangesProcedure extends BaseProcedure { + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<GenerateChangesProcedure>() { + @Override + protected GenerateChangesProcedure doBuild() { + return new GenerateChangesProcedure(tableCatalog()); + } + }; + } + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { Review Comment: I'd consider supporting these parameters (names TBD). - Table name - Changelog view name - A map of options that would also contain boundaries - A boolean to indicate whether we should compute pre/post images - A boolean to indicate whether we should remove carried over records - Default identifier columns to use if not defined in snapshot's schema ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Arrays; +import java.util.UUID; +import org.apache.iceberg.ChangelogOperation; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.expressions.Window; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.jetbrains.annotations.NotNull; + +public class GenerateChangesProcedure extends BaseProcedure { + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<GenerateChangesProcedure>() { + @Override + protected GenerateChangesProcedure doBuild() { + return new GenerateChangesProcedure(tableCatalog()); + } + }; + } + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", DataTypes.StringType), + // the snapshot ids input are ignored when the start/end timestamps are provided + ProcedureParameter.optional("start_snapshot_id_exclusive", DataTypes.LongType), + ProcedureParameter.optional("end_snapshot_id_inclusive", DataTypes.LongType), + ProcedureParameter.optional("table_change_view", DataTypes.StringType), + ProcedureParameter.optional("identifier_columns", DataTypes.StringType), Review Comment: Each snapshot's schema has a list of identity columns. I feel asking users for identity columns when those are already defined is not necessary. Let me think how we can handle this without too much complication. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Arrays; +import java.util.UUID; +import org.apache.iceberg.ChangelogOperation; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.expressions.Window; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.jetbrains.annotations.NotNull; + +public class GenerateChangesProcedure extends BaseProcedure { + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<GenerateChangesProcedure>() { + @Override + protected GenerateChangesProcedure doBuild() { + return new GenerateChangesProcedure(tableCatalog()); + } + }; + } + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", DataTypes.StringType), + // the snapshot ids input are ignored when the start/end timestamps are provided + ProcedureParameter.optional("start_snapshot_id_exclusive", DataTypes.LongType), + ProcedureParameter.optional("end_snapshot_id_inclusive", DataTypes.LongType), + ProcedureParameter.optional("table_change_view", DataTypes.StringType), + ProcedureParameter.optional("identifier_columns", DataTypes.StringType), Review Comment: I don't think operation conditions should matter. Suppose I have the following table with a primary key column `pk`. ``` ----------------- pk | dep | salary ----------------- 1, hr, 100 2, software, 80 3, hardware, 120 4, software, 110 5, hr, 95 6, software, 100 ``` If I issue `UPDATE t SET salary = 110 WHERE salary = 100`, it will generate the following changes. ``` -------------------------------------------------------------------------- _change_type | _change_ordinal | _commit_snapshot_id | pk | dep | salary -------------------------------------------------------------------------- DELETE, 0, s1, 1, hr, 100 INSERT, 0, s1, 1, hr, 110 DELETE, 0, s1, 6, software, 100 INSERT, 0, s1, 6, software, 110 ``` Even though I had a condition on `salary`, we should use `pk` for computing pre/post images. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java: ########## @@ -53,6 +53,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() { mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder); mapBuilder.put("register_table", RegisterTableProcedure::builder); mapBuilder.put("publish_changes", PublishChangesProcedure::builder); + mapBuilder.put("generate_changes", GenerateChangesProcedure::builder); Review Comment: Are there any alternative names? I am not sure the procedure actually generates changes. Let's think a bit. It is not bad but I wonder whether we can be a bit more specific. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.iceberg.ChangelogOperation; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.jetbrains.annotations.NotNull; + +public class GenerateChangesProcedure extends BaseProcedure { + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<GenerateChangesProcedure>() { + @Override + protected GenerateChangesProcedure doBuild() { + return new GenerateChangesProcedure(tableCatalog()); + } + }; + } + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", DataTypes.StringType), + // the snapshot ids input are ignored when the start/end timestamps are provided + ProcedureParameter.optional("start_snapshot_id_exclusive", DataTypes.LongType), + ProcedureParameter.optional("end_snapshot_id_inclusive", DataTypes.LongType), + ProcedureParameter.optional("table_change_view", DataTypes.StringType), + ProcedureParameter.optional("identifier_columns", DataTypes.StringType), + ProcedureParameter.optional("start_timestamp", DataTypes.TimestampType), + ProcedureParameter.optional("end_timestamp", DataTypes.TimestampType), + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("view_name", DataTypes.StringType, false, Metadata.empty()) + }); + + private GenerateChangesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + String tableName = args.getString(0); + + // Read data from the table.changes + Dataset<Row> df = changelogRecords(tableName, args); + + // Compute the pre-image and post-images if the identifier columns are provided. + if (!args.isNullAt(4)) { + String[] identifierColumns = args.getString(4).split(","); + if (identifierColumns.length > 0) { + df = withUpdate(df, identifierColumns); + } + } + + String viewName = viewName(args, tableName); + + // Create a view for users to query + df.createOrReplaceTempView(viewName); + + return toOutputRows(viewName); + } + + private Dataset<Row> changelogRecords(String tableName, InternalRow args) { + Long[] snapshotIds = getSnapshotIds(tableName, args); + + // we don't have to validate the snapshot ids here because the reader will do it for us. + DataFrameReader reader = spark().read(); + if (snapshotIds[0] != null) { + reader = reader.option(SparkReadOptions.START_SNAPSHOT_ID, snapshotIds[0]); + } + + if (snapshotIds[1] != null) { + reader = reader.option(SparkReadOptions.END_SNAPSHOT_ID, snapshotIds[1]); + } + + return reader.table(tableName + "." + SparkChangelogTable.TABLE_NAME); + } + + @NotNull + private Long[] getSnapshotIds(String tableName, InternalRow args) { + Long[] snapshotIds = new Long[] {null, null}; + + Long startTimestamp = args.isNullAt(5) ? null : DateTimeUtil.microsToMillis(args.getLong(5)); + Long endTimestamp = args.isNullAt(6) ? null : DateTimeUtil.microsToMillis(args.getLong(6)); + + if (startTimestamp == null && endTimestamp == null) { + snapshotIds[0] = args.isNullAt(1) ? null : args.getLong(1); + snapshotIds[1] = args.isNullAt(2) ? null : args.getLong(2); + } else { + Identifier tableIdent = toIdentifier(tableName, PARAMETERS[0].name()); + Table table = loadSparkTable(tableIdent).table(); + Snapshot[] snapshots = snapshotsFromTimestamp(startTimestamp, endTimestamp, table); + + if (snapshots != null) { + snapshotIds[0] = snapshots[0].parentId(); + snapshotIds[1] = snapshots[1].snapshotId(); + } + } + return snapshotIds; + } + + private Snapshot[] snapshotsFromTimestamp(Long startTimestamp, Long endTimestamp, Table table) { + Snapshot[] snapshots = new Snapshot[] {null, null}; + + if (startTimestamp != null && endTimestamp != null && startTimestamp > endTimestamp) { + throw new IllegalArgumentException( + "Start timestamp must be less than or equal to end timestamp"); + } + + if (startTimestamp == null) { + snapshots[0] = SnapshotUtil.oldestAncestor(table); + } else { + snapshots[0] = SnapshotUtil.oldestAncestorAfter(table, startTimestamp); + } + + if (endTimestamp == null) { + snapshots[1] = table.currentSnapshot(); + } else { + snapshots[1] = table.snapshot(SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp)); + } + + if (snapshots[0] == null || snapshots[1] == null) { + return null; + } + + return snapshots; + } + + @NotNull + private static String viewName(InternalRow args, String tableName) { + String viewName = args.isNullAt(3) ? null : args.getString(3); + if (viewName == null) { + String shortTableName = + tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName; + viewName = shortTableName + "_changes"; + } + return viewName; + } + + private Dataset<Row> withUpdate(Dataset<Row> df, String[] identifiers) { + Column[] partitionSpec = getPartitionSpec(df, identifiers); + Column[] sortSpec = sortSpec(df, partitionSpec); + + int changeTypeIdx = df.schema().fieldIndex(MetadataColumns.CHANGE_TYPE.name()); + List<Integer> partitionIdx = + Arrays.stream(partitionSpec) + .map(column -> df.schema().fieldIndex(column.toString())) + .collect(Collectors.toList()); + + return df.repartition(partitionSpec) + .sortWithinPartitions(sortSpec) + .mapPartitions( + processRowsWithinTask(changeTypeIdx, partitionIdx), RowEncoder.apply(df.schema())); + } + + @NotNull + private static Column[] getPartitionSpec(Dataset<Row> df, String[] identifiers) { + Column[] partitionSpec = new Column[identifiers.length + 1]; + for (int i = 0; i < identifiers.length; i++) { + try { + partitionSpec[i] = df.col(identifiers[i]); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Identifier column '%s' does not exist in the table", identifiers[i]), e); + } + } + partitionSpec[partitionSpec.length - 1] = df.col(MetadataColumns.CHANGE_ORDINAL.name()); + return partitionSpec; + } + + @NotNull + private static Column[] sortSpec(Dataset<Row> df, Column[] partitionSpec) { + Column[] sortSpec = new Column[partitionSpec.length + 1]; + System.arraycopy(partitionSpec, 0, sortSpec, 0, partitionSpec.length); + sortSpec[sortSpec.length - 1] = df.col(MetadataColumns.CHANGE_TYPE.name()); + return sortSpec; + } + + private static MapPartitionsFunction<Row, Row> processRowsWithinTask( Review Comment: I'll take a look at this with fresh eyes tomorrow. It would be nice to decouple this into a class so that we can unit test by explicitly passing an iterator of rows. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Arrays; +import java.util.UUID; +import org.apache.iceberg.ChangelogOperation; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.expressions.Window; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.jetbrains.annotations.NotNull; + +public class GenerateChangesProcedure extends BaseProcedure { + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<GenerateChangesProcedure>() { + @Override + protected GenerateChangesProcedure doBuild() { + return new GenerateChangesProcedure(tableCatalog()); + } + }; + } + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", DataTypes.StringType), + // the snapshot ids input are ignored when the start/end timestamps are provided + ProcedureParameter.optional("start_snapshot_id_exclusive", DataTypes.LongType), + ProcedureParameter.optional("end_snapshot_id_inclusive", DataTypes.LongType), + ProcedureParameter.optional("table_change_view", DataTypes.StringType), + ProcedureParameter.optional("identifier_columns", DataTypes.StringType), + ProcedureParameter.optional("start_timestamp", DataTypes.TimestampType), + ProcedureParameter.optional("end_timestamp", DataTypes.TimestampType), Review Comment: I am not sure. I'd consider having `read_options` or `options` as a map that would be passed while loading deletes and inserts as `DataFrame`. Then users can specify boundaries directly in the map. We already respect these options from `SparkReadOptions` in the `changes` table: ``` // Start snapshot ID used in incremental scans (exclusive) public static final String START_SNAPSHOT_ID = "start-snapshot-id"; // End snapshot ID used in incremental scans (inclusive) public static final String END_SNAPSHOT_ID = "end-snapshot-id"; ``` We could add `start-timestamp` and `end-timestamp`, `start-snapshot-id-inclusive`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org