RussellSpitzer commented on code in PR #7388: URL: https://github.com/apache/iceberg/pull/7388#discussion_r1186496475
########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ########## @@ -18,193 +18,78 @@ */ package org.apache.iceberg.spark; -import java.util.Arrays; import java.util.Iterator; -import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; import org.apache.iceberg.ChangelogOperation; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.sql.types.StructType; -/** - * An iterator that transforms rows from changelog tables within a single Spark task. It assumes - * that rows are sorted by identifier columns and change type. - * - * <p>It removes the carry-over rows. Carry-over rows are the result of a removal and insertion of - * the same row within an operation because of the copy-on-write mechanism. For example, given a - * file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of - * row2 would require erasing this file and preserving row1 in a new file. The change-log table - * would report this as (id=1, data='a', op='DELETE') and (id=1, data='a', op='INSERT'), despite it - * not being an actual change to the table. The iterator finds the carry-over rows and removes them - * from the result. - * - * <p>This iterator also finds delete/insert rows which represent an update, and converts them into - * update records. For example, these two rows - * - * <ul> - * <li>(id=1, data='a', op='DELETE') - * <li>(id=1, data='b', op='INSERT') - * </ul> - * - * <p>will be marked as update-rows: - * - * <ul> - * <li>(id=1, data='a', op='UPDATE_BEFORE') - * <li>(id=1, data='b', op='UPDATE_AFTER') - * </ul> - */ -public class ChangelogIterator implements Iterator<Row> { - private static final String DELETE = ChangelogOperation.DELETE.name(); - private static final String INSERT = ChangelogOperation.INSERT.name(); - private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); - private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); +/** An iterator that transforms rows from changelog tables within a single Spark task. */ +public abstract class ChangelogIterator implements Iterator<Row> { + protected static final String DELETE = ChangelogOperation.DELETE.name(); + protected static final String INSERT = ChangelogOperation.INSERT.name(); + protected static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + protected static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); private final Iterator<Row> rowIterator; private final int changeTypeIndex; - private final List<Integer> identifierFieldIdx; - private final int[] indicesForIdentifySameRow; - private Row cachedRow = null; - - private ChangelogIterator( - Iterator<Row> rowIterator, StructType rowType, String[] identifierFields) { + protected ChangelogIterator(Iterator<Row> rowIterator, StructType rowType) { this.rowIterator = rowIterator; this.changeTypeIndex = rowType.fieldIndex(MetadataColumns.CHANGE_TYPE.name()); - this.identifierFieldIdx = - Arrays.stream(identifierFields) - .map(column -> rowType.fieldIndex(column.toString())) - .collect(Collectors.toList()); - this.indicesForIdentifySameRow = generateIndicesForIdentifySameRow(rowType.size()); + } + + protected int changeTypeIndex() { + return changeTypeIndex; + } + + protected Iterator<Row> rowIterator() { + return rowIterator; } /** - * Creates an iterator for records of a changelog table. + * Creates an iterator combine with {@link RemoveCarryoverIterator} and {@link + * ComputeUpdateIterator} to remove carry-over rows and compute update rows * * @param rowIterator the iterator of rows from a changelog table * @param rowType the schema of the rows * @param identifierFields the names of the identifier columns, which determine if rows are the * same - * @return a new {@link ChangelogIterator} instance concatenated with the null-removal iterator + * @return a new iterator instance */ - public static Iterator<Row> create( + public static Iterator<Row> computeUpdates( Iterator<Row> rowIterator, StructType rowType, String[] identifierFields) { + Iterator<Row> carryoverRemoveIterator = removeCarryovers(rowIterator, rowType); ChangelogIterator changelogIterator = - new ChangelogIterator(rowIterator, rowType, identifierFields); + new ComputeUpdateIterator(carryoverRemoveIterator, rowType, identifierFields); return Iterators.filter(changelogIterator, Objects::nonNull); } - @Override - public boolean hasNext() { - if (cachedRow != null) { - return true; - } - return rowIterator.hasNext(); - } - - @Override - public Row next() { - // if there is an updated cached row, return it directly - if (cachedUpdateRecord(cachedRow)) { - Row row = cachedRow; - cachedRow = null; - return row; - } - - Row currentRow = currentRow(); - - if (currentRow.getString(changeTypeIndex).equals(DELETE) && rowIterator.hasNext()) { - Row nextRow = rowIterator.next(); - cachedRow = nextRow; - - if (isUpdateOrCarryoverRecord(currentRow, nextRow)) { - if (isCarryoverRecord(currentRow, nextRow)) { - // set carry-over rows to null for filtering out later - currentRow = null; - cachedRow = null; - } else { - currentRow = modify(currentRow, changeTypeIndex, UPDATE_BEFORE); - cachedRow = modify(nextRow, changeTypeIndex, UPDATE_AFTER); - } - } - } - - return currentRow; - } - - private Row modify(Row row, int valueIndex, Object value) { - if (row instanceof GenericRow) { - GenericRow genericRow = (GenericRow) row; - genericRow.values()[valueIndex] = value; - return genericRow; - } else { - Object[] values = new Object[row.size()]; - for (int index = 0; index < row.size(); index++) { - values[index] = row.get(index); - } - values[valueIndex] = value; - return RowFactory.create(values); - } - } - - private int[] generateIndicesForIdentifySameRow(int columnSize) { - int[] indices = new int[columnSize - 1]; - for (int i = 0; i < indices.length; i++) { - if (i < changeTypeIndex) { - indices[i] = i; - } else { - indices[i] = i + 1; - } - } - return indices; - } - - private boolean isCarryoverRecord(Row currentRow, Row nextRow) { - for (int idx : indicesForIdentifySameRow) { - if (!isColumnSame(currentRow, nextRow, idx)) { - return false; - } - } - - return true; - } - - private boolean cachedUpdateRecord(Row cachedRecord) { - return cachedRecord != null - && !cachedRecord.getString(changeTypeIndex).equals(DELETE) - && !cachedRecord.getString(changeTypeIndex).equals(INSERT); - } - - private Row currentRow() { - if (cachedRow != null) { - Row row = cachedRow; - cachedRow = null; - return row; - } else { - return rowIterator.next(); - } + /** + * Creates an iterator that removes carry-over rows from a changelog table. + * + * @param rowIterator the iterator of rows from a changelog table + * @param rowType the schema of the rows + * @return a new iterator instance + */ + public static Iterator<Row> removeCarryovers(Iterator<Row> rowIterator, StructType rowType) { + RemoveCarryoverIterator changelogIterator = new RemoveCarryoverIterator(rowIterator, rowType); + return Iterators.filter(changelogIterator, Objects::nonNull); } - private boolean isUpdateOrCarryoverRecord(Row currentRow, Row nextRow) { - return sameLogicalRow(currentRow, nextRow) - && currentRow.getString(changeTypeIndex).equals(DELETE) - && nextRow.getString(changeTypeIndex).equals(INSERT); + protected boolean isDifferentValue(Row currentRow, Row nextRow, int idx) { + return !Objects.equals(nextRow.get(idx), currentRow.get(idx)); } - private boolean sameLogicalRow(Row currentRow, Row nextRow) { - for (int idx : identifierFieldIdx) { - if (!isColumnSame(currentRow, nextRow, idx)) { - return false; - } - } - return true; + @Override Review Comment: Class is abstract now we can drop these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org