pvary commented on code in PR #15996:
URL: https://github.com/apache/iceberg/pull/15996#discussion_r3235483363


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertPlanner.java:
##########
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Planner for the equality delete conversion pipeline. For each trigger, it 
picks the oldest
+ * staging snapshot that hasn't been converted yet and emits {@link 
ReadCommand}s describing the
+ * files its downstream readers and workers must process.
+ *
+ * <p>Each trigger runs two steps in order:
+ *
+ * <ol>
+ *   <li>{@link #ensureIndexCurrent}: updates {@link #lastStagingSnapshotId} 
from main's history,
+ *       bootstraps the worker index from main on first run, and reindexes 
when external commits
+ *       (e.g. compaction) have advanced main past the currently-indexed 
snapshot.
+ *   <li>{@link #processStagingSnapshot}: resolve the chosen staging 
snapshot's eq deletes against
+ *       the (now-current) index, pass through any DV files, and index the 
snapshot's new data files
+ *       for the next cycle.
+ * </ol>
+ *
+ * Watermarks separate phases that gate the worker's keyed state. The contract 
is documented on
+ * {@link #advancePhase()}.
+ *
+ * <p>An {@link EqualityConvertPlanResult} with the current cycle's metadata 
is emitted via the
+ * {@link #METADATA_STREAM} side output after the read commands.
+ *
+ * <p>Assumes a single equality-field set supplied via the builder; staging 
eq-deletes with a
+ * different {@code equalityFieldIds} fail fast in {@link 
#retrieveStagingFiles}. Concurrent writes
+ * on the target branch are handled by {@link #ensureIndexCurrent} reindexing 
from the new main
+ * snapshot; commit-time conflicts are caught by {@code 
RowDelta.validateFromSnapshot}.
+ */
+@Internal
+public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand>
+    implements OneInputStreamOperator<Trigger, ReadCommand> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EqualityConvertPlanner.class);
+
+  public static final OutputTag<EqualityConvertPlanResult> METADATA_STREAM =
+      new OutputTag<>("metadata-stream") {};
+
+  private static final String PROCESSED_EQ_DELETE_FILE_NUM_METRIC = 
"processedEqDeleteFileNum";
+  private static final String PROCESSED_STAGING_SNAPSHOT_NUM_METRIC = 
"processedStagingSnapshotNum";
+  private static final String SKIPPED_NO_OP_CYCLES_METRIC = 
"skippedNoOpCycles";
+  private static final String REINDEX_COUNT_METRIC = "reindexCount";
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final String stagingBranch;
+  private final String targetBranch;
+  // Equality-field-id list the worker keys on. Supplied via the builder; 
every staging
+  // eq-delete's equalityFieldIds() must match exactly.
+  private final List<Integer> eqFieldIds;
+
+  // Main snapshot id the worker's index reflects.
+  private transient ListState<Long> indexSnapshotState;
+
+  private transient Table table;
+
+  private transient Long lastMainSnapshotId;
+  private transient Long lastStagingSnapshotId;
+  private transient Long indexSnapshotId;
+
+  private transient long nextPhaseTs;
+
+  // Per-trigger cache of SnapshotChanges to avoid re-parsing manifest lists.
+  private transient Map<Long, SnapshotChanges> snapshotChangesCache;
+
+  private transient Counter processedEqDeleteFileNumCounter;
+  private transient Counter processedStagingSnapshotNumCounter;
+  private transient Counter skippedNoOpCyclesCounter;
+  private transient Counter reindexCounter;
+
+  public EqualityConvertPlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      String stagingBranch,
+      String targetBranch,
+      List<Integer> eqFieldIds) {
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.stagingBranch = stagingBranch;
+    this.targetBranch = targetBranch;
+    Preconditions.checkArgument(
+        eqFieldIds != null && !eqFieldIds.isEmpty(), "eqFieldIds must not be 
null or empty");
+    this.eqFieldIds = 
Collections.unmodifiableList(Lists.newArrayList(eqFieldIds));
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    if (!tableLoader.isOpen()) {
+      tableLoader.open();
+    }
+
+    table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.processedEqDeleteFileNumCounter =
+        taskMetricGroup.counter(PROCESSED_EQ_DELETE_FILE_NUM_METRIC);
+    this.processedStagingSnapshotNumCounter =
+        taskMetricGroup.counter(PROCESSED_STAGING_SNAPSHOT_NUM_METRIC);
+    this.skippedNoOpCyclesCounter = 
taskMetricGroup.counter(SKIPPED_NO_OP_CYCLES_METRIC);
+    this.reindexCounter = taskMetricGroup.counter(REINDEX_COUNT_METRIC);
+    this.snapshotChangesCache = Maps.newHashMap();
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    indexSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("indexSnapshotId", 
Types.LONG));
+
+    indexSnapshotId = null;
+    for (Long stateValue : indexSnapshotState.get()) {
+      Preconditions.checkState(
+          indexSnapshotId == null, "indexSnapshotId state should hold at most 
one value");
+      indexSnapshotId = stateValue;
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    indexSnapshotState.clear();
+    if (indexSnapshotId != null) {
+      indexSnapshotState.add(indexSnapshotId);
+    }
+  }
+
+  @Override
+  public void processElement(StreamRecord<Trigger> element) throws Exception {
+    long triggerTs = element.getTimestamp();
+    nextPhaseTs = Math.max(triggerTs, nextPhaseTs + 1);
+    snapshotChangesCache.clear();
+
+    try {
+      table.refresh();
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+      ensureIndexCurrent(mainSnapshot);
+
+      Snapshot nextToProcess =
+          unprocessedStagingSnapshot(table.snapshot(stagingBranch), 
mainSnapshot);
+
+      if (nextToProcess == null) {
+        LOG.info("Nothing new to convert on staging branch '{}'.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      processStagingSnapshot(nextToProcess, triggerTs, currentMainSnapshotId);
+    } catch (Exception e) {
+      LOG.error(
+          "Error processing equality deletes for table {} task {}[{}]",
+          tableName,
+          taskName,
+          taskIndex,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+    }
+  }
+
+  /**
+   * Brings the worker's index up to date with the current state of the target 
branch:
+   *
+   * <ul>
+   *   <li>Updates {@link #lastStagingSnapshotId} from the most recent 
committer marker on main.
+   *   <li>Bootstraps the index from main on the first trigger with a non-null 
main snapshot.
+   *   <li>Reindexes from main when external commits (e.g. compaction or 
direct writes) have
+   *       advanced main past the currently-indexed snapshot.
+   * </ul>
+   *
+   * <p>No-op when main hasn't moved since the last trigger. Otherwise the 
history walk is bounded
+   * to commits added since {@link #lastMainSnapshotId}.
+   */
+  private void ensureIndexCurrent(Snapshot mainSnapshot) {
+    Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+    if (Objects.equals(lastMainSnapshotId, currentMainSnapshotId)) {
+      return;
+    }
+
+    StagingBranchInfo info = scanStagingBranch(mainSnapshot);
+    // Only update if found; if main was rewritten such that no committer 
property is reachable,
+    // keep the previously known value.
+    if (info.lastCommittedStaging() != null) {
+      lastStagingSnapshotId = info.lastCommittedStaging();
+    }
+
+    boolean bootstrap = mainSnapshot != null && indexSnapshotId == null;
+    boolean reindex = indexSnapshotId != null && info.externalCommitCount() > 
0;
+    if (bootstrap || reindex) {
+      LOG.info(
+          "{} worker index from main snapshot {} for field IDs {}.",
+          bootstrap ? "Bootstrapping" : "Reindexing",
+          currentMainSnapshotId,
+          eqFieldIds);
+      indexSnapshotId = currentMainSnapshotId;
+      emitMainDataReadCommands(mainSnapshot);
+      if (reindex) {
+        reindexCounter.inc();
+      }
+    }
+
+    lastMainSnapshotId = currentMainSnapshotId;
+  }
+
+  /**
+   * Walks main back from head looking for the most recent snapshot tagged 
with {@link
+   * EqualityConvertCommitter#COMMITTED_STAGING_SNAPSHOT_PROPERTY}. Returns 
the staging snapshot id
+   * recorded there (or {@code null} if not reached) and the count of 
intervening external commits.
+   *
+   * <p>The walk stops at whichever comes first:
+   *
+   * <ul>
+   *   <li>The first snapshot carrying the committer marker.
+   *   <li>{@link #lastMainSnapshotId} — anything older was inspected on a 
previous trigger.
+   * </ul>
+   */
+  private StagingBranchInfo scanStagingBranch(Snapshot mainSnapshot) {
+    Long lastCommittedStaging = null;
+    int externalCount = 0;
+    Snapshot current = mainSnapshot;
+    while (current != null) {
+      if (lastMainSnapshotId != null && current.snapshotId() == 
lastMainSnapshotId) {
+        break;
+      }
+
+      String prop =
+          
current.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (prop != null) {
+        lastCommittedStaging = Long.parseLong(prop);
+        break;
+      }
+
+      externalCount++;
+      current = parentOf(current);
+    }
+
+    return new StagingBranchInfo(lastCommittedStaging, externalCount);
+  }
+
+  private record StagingBranchInfo(Long lastCommittedStaging, int 
externalCommitCount) {}
+
+  /**
+   * Walks staging history from head back to the stop point (last-processed 
snapshot, or, on cold
+   * start, the common ancestor with target — or just the head's parent if 
{@code stagingBranch ==
+   * targetBranch}) and returns the oldest unprocessed snapshot to convert 
this cycle, or {@code
+   * null} if there's nothing new.
+   */
+  private Snapshot unprocessedStagingSnapshot(Snapshot stagingHead, Snapshot 
mainSnapshot) {
+    if (stagingHead == null) {
+      return null;
+    }
+
+    Long stopAt;
+    if (lastStagingSnapshotId != null) {
+      stopAt = lastStagingSnapshotId;
+    } else if (stagingBranch.equals(targetBranch)) {
+      // On cold start with a shared branch, everything before head is already 
on target.
+      stopAt = stagingHead.parentId();
+    } else {
+      stopAt = findCommonAncestor(stagingHead, mainSnapshot);
+    }
+
+    Snapshot current = stagingHead;
+    Snapshot oldestUnprocessed = null;
+    while (current != null) {
+      if (stopAt != null && current.snapshotId() == stopAt) {
+        break;
+      }
+
+      if (!shouldSkip(current)) {
+        oldestUnprocessed = current;
+      }
+
+      current = parentOf(current);
+    }
+
+    return oldestUnprocessed;
+  }
+
+  /**
+   * Resolves the eq deletes in {@code stagingSnapshot} against the current 
index and emits the
+   * cycle's metadata. Phase ordering (separated by watermarks):
+   *
+   * <ol>
+   *   <li>Eq delete + pos delete read commands. Eq deletes resolve in the 
worker; pos deletes go
+   *       directly to the DV merge via side output.
+   *   <li>Staging data files. Indexed for the NEXT cycle's eq-delete 
resolution.
+   * </ol>
+   *
+   * Cold-start bootstrap of the index from main data is handled separately in 
{@link
+   * #processElement}, which runs once before the first cycle.
+   */
+  private void processStagingSnapshot(
+      Snapshot stagingSnapshot, long triggerTs, Long currentMainSnapshotId) {
+
+    StagingInputs inputs = retrieveStagingFiles(stagingSnapshot);
+    if (inputs.isEmpty()) {
+      LOG.info("No new files on staging branch '{}' to convert.", 
stagingBranch);
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+      return;
+    }
+
+    emitDeletePhase(inputs.eqDeleteFiles());
+    emitSnapshotDataPhase(inputs.newDataFiles());
+
+    LOG.info(
+        "Emitted read commands for {} new data files from staging branch 
'{}'.",
+        inputs.newDataFiles().size(),
+        stagingBranch);
+
+    processedStagingSnapshotNumCounter.inc();
+
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                inputs.newDataFiles(),
+                inputs.stagingDVFiles(),
+                stagingSnapshot.snapshotId(),
+                currentMainSnapshotId,
+                triggerTs,
+                nextPhaseTs)));
+
+    advancePhase();
+  }
+
+  /**
+   * Classifies the files added by {@code stagingSnapshot} into data files, eq 
delete files (grouped
+   * by equality-field-id schema), and DV files. Throws if the snapshot:
+   *
+   * <ul>
+   *   <li>Removes data files (rewrites on the staging branch aren't 
supported).
+   *   <li>Contains V2 positional delete files (the converter expects a V3 
staging branch written by
+   *       Flink, which produces only deletion vectors for deletes).
+   *   <li>Contains an eq-delete file whose {@code equalityFieldIds()} doesn't 
match the
+   *       builder-configured set (silent wrong-key serialization otherwise).
+   * </ul>
+   */
+  private StagingInputs retrieveStagingFiles(Snapshot stagingSnapshot) {
+    SnapshotChanges changes = snapshotChanges(stagingSnapshot);
+
+    // Rewrites on the staging branch would require rewriting the 
corresponding DVs against new
+    // data files on target. Not implemented; fail fast instead of silently 
dropping work.
+    if (changes.removedDataFiles().iterator().hasNext()) {
+      throw new IllegalStateException(
+          String.format(
+              "Staging snapshot %s on branch '%s' removes data files; "
+                  + "equality delete conversion does not support rewrites on 
the staging branch. "
+                  + "Run compaction on the target branch instead.",
+              stagingSnapshot.snapshotId(), stagingBranch));
+    }
+
+    List<DataFile> newDataFiles = Lists.newArrayList();
+    List<DeleteFile> stagingDVFiles = Lists.newArrayList();
+    List<DeleteFile> eqDeleteFiles = Lists.newArrayList();
+
+    // SnapshotChanges' iterators reuse the same DataFile / DeleteFile 
instance across iterations.
+    // Copy before buffering.
+    for (DataFile dataFile : changes.addedDataFiles()) {
+      newDataFiles.add(dataFile.copy());
+    }
+
+    for (DeleteFile deleteFile : changes.addedDeleteFiles()) {
+      if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
+        List<Integer> deleteFieldIds = 
Lists.newArrayList(deleteFile.equalityFieldIds());
+        Collections.sort(deleteFieldIds);
+        Preconditions.checkState(
+            deleteFieldIds.equals(eqFieldIds),
+            "Staging snapshot %s on branch '%s' contains an equality delete 
file %s with "
+                + "equalityFieldIds=%s, which does not match the configured 
eqFieldIds=%s. "
+                + "The writer must use the same equality field IDs as the 
converter.",
+            stagingSnapshot.snapshotId(),
+            stagingBranch,
+            deleteFile.location(),
+            deleteFieldIds,
+            eqFieldIds);
+        eqDeleteFiles.add(deleteFile.copy());
+      } else if (ContentFileUtil.isDV(deleteFile)) {
+        stagingDVFiles.add(deleteFile.copy());
+      } else {
+        throw new IllegalStateException(
+            String.format(
+                "Staging snapshot %s on branch '%s' contains a V2 positional 
delete file (%s); "
+                    + "equality delete conversion expects a V3 staging branch 
written by Flink, "
+                    + "which produces only deletion vectors for deletes.",
+                stagingSnapshot.snapshotId(), stagingBranch, 
deleteFile.location()));
+      }
+    }
+
+    return new StagingInputs(newDataFiles, stagingDVFiles, eqDeleteFiles);
+  }
+
+  /** Files added by one staging snapshot, classified for cycle emission. */
+  private record StagingInputs(
+      List<DataFile> newDataFiles,
+      List<DeleteFile> stagingDVFiles,
+      List<DeleteFile> eqDeleteFiles) {
+
+    boolean isEmpty() {
+      return newDataFiles.isEmpty() && eqDeleteFiles.isEmpty() && 
stagingDVFiles.isEmpty();
+    }
+  }
+
+  private void emitDeletePhase(List<DeleteFile> eqDeleteFiles) {
+    for (DeleteFile deleteFile : eqDeleteFiles) {
+      PartitionSpec spec = table.specs().get(deleteFile.specId());
+      output.collect(
+          new StreamRecord<>(
+              ReadCommand.eqDeleteFile(deleteFile, spec, indexSnapshotId), 
nextPhaseTs));
+      processedEqDeleteFileNumCounter.inc();
+    }
+
+    advancePhase();
+  }
+
+  private void emitSnapshotDataPhase(List<DataFile> snapshotDataFiles) {
+    // Shared-branch skip: when stagingBranch == targetBranch, these files are 
already on target
+    // and were indexed by bootstrap/reindex. Re-emitting would duplicate 
entries in
+    // dataRowPositions.
+    if (!stagingBranch.equals(targetBranch)) {
+      for (DataFile dataFile : snapshotDataFiles) {
+        PartitionSpec spec = table.specs().get(dataFile.specId());
+        output.collect(
+            new StreamRecord<>(
+                ReadCommand.dataFile(new FlinkAddedRowsScanTask(dataFile, 
spec), indexSnapshotId),
+                nextPhaseTs));
+      }
+    }
+
+    advancePhase();
+  }
+
+  private void emitNoOpResult(long triggerTimestamp, Long 
currentMainSnapshotId) {
+    skippedNoOpCyclesCounter.inc();
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                Lists.newArrayList(),
+                Lists.newArrayList(),
+                -1L,
+                currentMainSnapshotId,
+                triggerTimestamp,
+                nextPhaseTs)));
+    advancePhase();
+  }
+
+  /**
+   * Emits {@link ReadCommand}s for every data file on {@code mainSnapshot} so 
the worker indexes
+   * them for the configured equality-field set. Advances the phase on 
completion. Used by both the
+   * cold-start bootstrap and the reindex path triggered by external commits 
on target.
+   */
+  private void emitMainDataReadCommands(Snapshot mainSnapshot) {
+    long commitSnapshotId = mainSnapshot.snapshotId();
+
+    try (CloseableIterable<FileScanTask> tasks =
+        table.newScan().useSnapshot(commitSnapshotId).planFiles()) {
+      for (FileScanTask task : tasks) {
+        output.collect(
+            new StreamRecord<>(ReadCommand.dataFile(task, indexSnapshotId), 
nextPhaseTs));
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to plan files for main index", e);
+    }
+
+    LOG.info(
+        "Emitted main data read commands for field IDs {} from snapshot {}.",
+        eqFieldIds,
+        commitSnapshotId);
+
+    advancePhase();
+  }
+
+  /**
+   * Emits a phase-end watermark and bumps the phase timestamp. Every 
phase-emitting method must
+   * call this exactly once after its records; the worker uses these 
watermarks to gate keyed-state
+   * transitions. Missing or extra calls silently break ordering.
+   */
+  private void advancePhase() {
+    output.emitWatermark(new Watermark(nextPhaseTs));
+    nextPhaseTs++;
+  }
+
+  /**
+   * Cached {@link SnapshotChanges} per snapshot, built once per trigger. See 
{@link
+   * #snapshotChangesCache}.
+   */
+  private SnapshotChanges snapshotChanges(Snapshot snapshot) {
+    return snapshotChangesCache.computeIfAbsent(
+        snapshot.snapshotId(), id -> 
SnapshotChanges.builderFor(table).snapshot(snapshot).build());
+  }
+
+  /**
+   * Returns {@code true} if {@code snapshot} can be skipped:
+   *
+   * <ul>
+   *   <li>It was already committed by us (carries {@link
+   *       EqualityConvertCommitter#COMMITTED_STAGING_SNAPSHOT_PROPERTY}), OR
+   *   <li>it has no added/removed files at all, OR
+   *   <li>{@code stagingBranch == targetBranch} and the snapshot adds no 
equality-delete files.
+   *       Pure-insert (and data-file-only) commits on the shared branch don't 
need a conversion
+   *       cycle: their data is already on target, and the worker's index 
stays fresh via {@link
+   *       #ensureIndexCurrent} when the next eq-delete arrives.
+   * </ul>
+   */
+  private boolean shouldSkip(Snapshot snapshot) {
+    if (snapshot
+        .summary()
+        
.containsKey(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY)) {
+      return true;
+    }
+
+    SnapshotChanges changes = snapshotChanges(snapshot);
+    boolean empty =
+        !changes.addedDataFiles().iterator().hasNext()
+            && !changes.addedDeleteFiles().iterator().hasNext()
+            && !changes.removedDataFiles().iterator().hasNext()
+            && !changes.removedDeleteFiles().iterator().hasNext();
+    if (empty) {
+      return true;
+    }
+
+    if (stagingBranch.equals(targetBranch) && !hasEqualityDelete(changes)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private static boolean hasEqualityDelete(SnapshotChanges changes) {
+    for (DeleteFile file : changes.addedDeleteFiles()) {
+      if (file.content() == FileContent.EQUALITY_DELETES) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Returns the id of the newest snapshot that is reachable from both the 
staging and target branch
+   * heads (i.e. the most recent common ancestor where the two branches last 
matched), or {@code
+   * null} if they share no history. Used on cold start to know where staging 
diverged from target
+   * so we can skip converting snapshots that already exist on target.
+   */
+  private Long findCommonAncestor(Snapshot stagingHead, Snapshot mainHead) {

Review Comment:
   This might be a good candidate to be part of the SnapshotUtil, and could use 
the patterns used there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to