danielcweeks commented on code in PR #9860:
URL: https://github.com/apache/iceberg/pull/9860#discussion_r1511419189


##########
core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java:
##########
@@ -87,31 +87,40 @@ public BaseReplacePartitions toBranch(String branch) {
 
   @Override
   public void validate(TableMetadata currentMetadata, Snapshot parent) {
-    if (validateConflictingData) {
-      if (dataSpec().isUnpartitioned()) {
-        validateAddedDataFiles(
-            currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), 
parent);
-      } else {
-        validateAddedDataFiles(currentMetadata, startingSnapshotId, 
replacedPartitions, parent);
-      }
-    }
-
-    if (validateConflictingDeletes) {
-      if (dataSpec().isUnpartitioned()) {
-        validateDeletedDataFiles(
-            currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), 
parent);
-        validateNoNewDeleteFiles(
-            currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), 
parent);
-      } else {
-        validateDeletedDataFiles(currentMetadata, startingSnapshotId, 
replacedPartitions, parent);
-        validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, 
replacedPartitions, parent);
-      }
-    }
+    dataSpecs()
+        .forEach(
+            dataSpec -> {
+              if (validateConflictingData) {
+                if (dataSpec.isUnpartitioned()) {
+                  validateAddedDataFiles(
+                      currentMetadata, startingSnapshotId, 
Expressions.alwaysTrue(), parent);
+                } else {
+                  validateAddedDataFiles(
+                      currentMetadata, startingSnapshotId, replacedPartitions, 
parent);
+                }
+              }
+
+              if (validateConflictingDeletes) {
+                if (dataSpec.isUnpartitioned()) {
+                  validateDeletedDataFiles(
+                      currentMetadata, startingSnapshotId, 
Expressions.alwaysTrue(), parent);
+                  validateNoNewDeleteFiles(
+                      currentMetadata, startingSnapshotId, 
Expressions.alwaysTrue(), parent);
+                } else {
+                  validateDeletedDataFiles(
+                      currentMetadata, startingSnapshotId, replacedPartitions, 
parent);
+                  validateNoNewDeleteFiles(
+                      currentMetadata, startingSnapshotId, replacedPartitions, 
parent);
+                }
+              }
+            });
   }
 
   @Override
   public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
-    if (dataSpec().fields().size() <= 0) {
+    // TODO: I don't understand this, why delete all data? what if only part 
of the table is

Review Comment:
   If the current schema of the table is unpartitioned, then a replace is 
replacing the whole table.  Historical partitions don't apply at that point 
since the replace is against the current schema.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -138,19 +138,34 @@ protected boolean isCaseSensitive() {
     return caseSensitive;
   }
 
+  /**
+   * @deprecated Use {@link #dataSpecs()} instead. This method only returns 
one of potentially
+   *     multiple {@link PartitionSpec} involved in this operation.
+   */
+  @Deprecated
   protected PartitionSpec dataSpec() {
+    Set<PartitionSpec> partitionSpecs = dataSpecs();

Review Comment:
   This assignment doesn't seem necessary:  `dataSpecs().iterator().next()`



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -79,18 +80,18 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   private final ManifestFilterManager<DeleteFile> deleteFilterManager;
 
   // update data
-  private final List<DataFile> newDataFiles = Lists.newArrayList();
+  private final Map<Integer, List<DataFile>> newDataFilesBySpec = 
Maps.newHashMap();
   private Long newDataFilesDataSequenceNumber;
   private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = 
Maps.newHashMap();
   private final List<ManifestFile> appendManifests = Lists.newArrayList();
   private final List<ManifestFile> rewrittenAppendManifests = 
Lists.newArrayList();
   private final SnapshotSummary.Builder addedFilesSummary = 
SnapshotSummary.builder();
   private final SnapshotSummary.Builder appendedManifestsSummary = 
SnapshotSummary.builder();
   private Expression deleteExpression = Expressions.alwaysFalse();
-  private PartitionSpec dataSpec;
+  private final Set<PartitionSpec> dataSpecs = Sets.newHashSet();

Review Comment:
   Can we drop this and just make the map `Map<PartitionSpec, List<DataFile>>` 
and use the keyset instead?



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -138,19 +138,34 @@ protected boolean isCaseSensitive() {
     return caseSensitive;
   }
 
+  /**
+   * @deprecated Use {@link #dataSpecs()} instead. This method only returns 
one of potentially
+   *     multiple {@link PartitionSpec} involved in this operation.
+   */
+  @Deprecated
   protected PartitionSpec dataSpec() {
+    Set<PartitionSpec> partitionSpecs = dataSpecs();
+    return partitionSpecs.iterator().next();
+  }
+
+  protected Set<PartitionSpec> dataSpecs() {
     Preconditions.checkState(
-        dataSpec != null, "Cannot determine partition spec: no data files have 
been added");
-    // the spec is set when the write is started
-    return dataSpec;
+        !dataSpecs.isEmpty(), "Cannot determine partition specs: no data files 
have been added");
+    return dataSpecs;
   }
 
   protected Expression rowFilter() {
     return deleteExpression;
   }
 
   protected List<DataFile> addedDataFiles() {
-    return ImmutableList.copyOf(newDataFiles);
+    ImmutableList.Builder<DataFile> builder = ImmutableList.builder();
+    addedDataFilesBySpec().values().forEach(builder::addAll);

Review Comment:
   I think you can use values+flatmap here instead of using a builder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to