RussellSpitzer commented on code in PR #7388:
URL: https://github.com/apache/iceberg/pull/7388#discussion_r1186493392


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##########
@@ -18,193 +18,78 @@
  */
 package org.apache.iceberg.spark;
 
-import java.util.Arrays;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 import org.apache.iceberg.ChangelogOperation;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.sql.types.StructType;
 
-/**
- * An iterator that transforms rows from changelog tables within a single 
Spark task. It assumes
- * that rows are sorted by identifier columns and change type.
- *
- * <p>It removes the carry-over rows. Carry-over rows are the result of a 
removal and insertion of
- * the same row within an operation because of the copy-on-write mechanism. 
For example, given a
- * file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A 
copy-on-write delete of
- * row2 would require erasing this file and preserving row1 in a new file. The 
change-log table
- * would report this as (id=1, data='a', op='DELETE') and (id=1, data='a', 
op='INSERT'), despite it
- * not being an actual change to the table. The iterator finds the carry-over 
rows and removes them
- * from the result.
- *
- * <p>This iterator also finds delete/insert rows which represent an update, 
and converts them into
- * update records. For example, these two rows
- *
- * <ul>
- *   <li>(id=1, data='a', op='DELETE')
- *   <li>(id=1, data='b', op='INSERT')
- * </ul>
- *
- * <p>will be marked as update-rows:
- *
- * <ul>
- *   <li>(id=1, data='a', op='UPDATE_BEFORE')
- *   <li>(id=1, data='b', op='UPDATE_AFTER')
- * </ul>
- */
-public class ChangelogIterator implements Iterator<Row> {
-  private static final String DELETE = ChangelogOperation.DELETE.name();
-  private static final String INSERT = ChangelogOperation.INSERT.name();
-  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
-  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+/** An iterator that transforms rows from changelog tables within a single 
Spark task. */
+public abstract class ChangelogIterator implements Iterator<Row> {
+  protected static final String DELETE = ChangelogOperation.DELETE.name();
+  protected static final String INSERT = ChangelogOperation.INSERT.name();
+  protected static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  protected static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
 
   private final Iterator<Row> rowIterator;
   private final int changeTypeIndex;
-  private final List<Integer> identifierFieldIdx;
-  private final int[] indicesForIdentifySameRow;
 
-  private Row cachedRow = null;
-
-  private ChangelogIterator(
-      Iterator<Row> rowIterator, StructType rowType, String[] 
identifierFields) {
+  protected ChangelogIterator(Iterator<Row> rowIterator, StructType rowType) {
     this.rowIterator = rowIterator;
     this.changeTypeIndex = 
rowType.fieldIndex(MetadataColumns.CHANGE_TYPE.name());
-    this.identifierFieldIdx =
-        Arrays.stream(identifierFields)
-            .map(column -> rowType.fieldIndex(column.toString()))
-            .collect(Collectors.toList());
-    this.indicesForIdentifySameRow = 
generateIndicesForIdentifySameRow(rowType.size());
+  }
+
+  protected int changeTypeIndex() {
+    return changeTypeIndex;
+  }
+
+  protected Iterator<Row> rowIterator() {
+    return rowIterator;
   }
 
   /**
-   * Creates an iterator for records of a changelog table.
+   * Creates an iterator combine with {@link RemoveCarryoverIterator} and 
{@link

Review Comment:
   Creates an iterator composing @link ... and @link



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to