flyrain commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1062008804
########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import org.apache.iceberg.ChangelogOperation; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. It assumes + * that rows are sorted by identifier columns and change type. + * + * <p>It removes the carry-over rows. Carry-over rows are the result of a removal and insertion of + * the same row within an operation because of the copy-on-write mechanism. For example, given a + * file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of + * row2 would require erasing this file and preserving row1 in a new file. The change-log table + * would report this as (id=1, data='a', op='DELETE') and (id=1, data='a', op='INSERT'), despite it + * not being an actual change to the table. The iterator finds out the carry-over rows and removes + * them from the result. + * + * <p>The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * <ul> + * <li>(id=1, data='a', op='DELETE') + * <li>(id=1, data='b', op='INSERT') + * </ul> + * + * <p>will be marked as update-rows: + * + * <ul> + * <li>(id=1, data='a', op='UPDATE_BEFORE') + * <li>(id=1, data='b', op='UPDATE_AFTER') + * </ul> + */ +public class ChangelogIterator implements Iterator<Row>, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator<Row> rowIterator; + private final int changeTypeIndex; + private final List<Integer> identifierFieldIdx; + + private Row cachedRow = null; + + private ChangelogIterator( + Iterator<Row> rowIterator, int changeTypeIndex, List<Integer> identifierFieldIdx) { + this.rowIterator = rowIterator; + this.changeTypeIndex = changeTypeIndex; + this.identifierFieldIdx = identifierFieldIdx; + } + + /** + * Creates an iterator for records of a changelog table. + * + * @param rowIterator the iterator of rows from a changelog table + * @param changeTypeIndex the index of the change type column + * @param identifierFieldIdx the indices of the identifier columns, which determine if rows are + * the same + * @return a new {@link ChangelogIterator} instance concatenated with the null-removal iterator + */ + public static Iterator<Row> iterator( + Iterator<Row> rowIterator, int changeTypeIndex, List<Integer> identifierFieldIdx) { + ChangelogIterator changelogIterator = + new ChangelogIterator(rowIterator, changeTypeIndex, identifierFieldIdx); + return Iterators.filter(changelogIterator, Objects::nonNull); + } + + @Override + public boolean hasNext() { + if (cachedRow != null) { + return true; + } + return rowIterator.hasNext(); + } + + @Override + public Row next() { + // if there is an updated cached row, return it directly + if (cachedUpdateRecord(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; + } + + Row currentRow = currentRow(); + + if (currentRow.getString(changeTypeIndex).equals(DELETE) && rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (isUpdateOrCarryoverRecord(currentRow, nextRow)) { + if (isCarryoverRecord(currentRow, nextRow)) { + // set carry-over rows to null for filtering out later + currentRow = null; + cachedRow = null; + } else { + Row[] rows = createUpdateChangelog((GenericRowWithSchema) currentRow, nextRow); + currentRow = rows[0]; + cachedRow = rows[1]; + } + } + } + + return currentRow; + } + + private Row[] createUpdateChangelog( + GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { + GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); + GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + + deletedRow.update(changeTypeIndex, UPDATE_BEFORE); + insertedRow.update(changeTypeIndex, UPDATE_AFTER); + + return new Row[] { + RowFactory.create(deletedRow.values()), RowFactory.create(insertedRow.values()) + }; + } + + private boolean isCarryoverRecord(Row currentRow, Row nextRow) { + int length = currentRow.length(); Review Comment: It is faster. Tested with a simulation of 100 columns comparison with two options 1. Using continue 2. Using a int[] to store the index The later always takes only 60-70% time of the former, which is surprisingly faster. I didn't expected so much difference. I made the change anyway in this class. Here is my test code. ``` var left = new ArrayList<Integer>(); var right = new ArrayList<Integer>(); for(int i = 0; i < 100; i++) { left.add(i); right.add(i); } var start = System.currentTimeMillis(); var match = false; for(int i = 0; i < 1000000; i++) { for(int j = 0; j < 100; j++) { if( j == 50){ continue; } match = left.get(j) == right.get(j); } } var end = System.currentTimeMillis(); System.out.println(String.format("duration: %s, match %s", end - start, match)); var indices = new int[99]; int k = 0; for(int i = 0; i < 100; i++) { if(i == 50) { continue; } indices[k++] = i; } start = System.currentTimeMillis(); for(int i = 0; i < 1000000; i++) { for(int j : indices) { match = left.get(j) == right.get(j); } } end = System.currentTimeMillis(); System.out.println(String.format("duration: %s, match: %s", end - start, match)); ``` Here are results: ``` shell:>loop-perf duration: 119, match true duration: 92, match: true shell:>loop-perf duration: 138, match true duration: 86, match: true shell:>loop-perf duration: 125, match true duration: 85, match: true shell:>loop-perf duration: 125, match true duration: 85, match: true shell:>loop-perf duration: 126, match true duration: 85, match: true shell:>loop-perf duration: 124, match true duration: 85, match: true shell:>loop-perf duration: 125, match true duration: 85, match: true shell:>loop-perf duration: 126, match true duration: 85, match: true shell:>loop-perf duration: 124, match true duration: 85, match: true ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org