aokolnychyi commented on code in PR #9020:
URL: https://github.com/apache/iceberg/pull/9020#discussion_r1396255446


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java:
##########
@@ -264,14 +293,15 @@ private <T, U> U withReusableDS(Dataset<T> ds, 
Function<Dataset<T>, U> func) {
     }
   }
 
-  private List<ManifestFile> findMatchingManifests() {
+  private List<ManifestFile> findMatchingManifests(ManifestContent content) {
     Snapshot currentSnapshot = table.currentSnapshot();
 
     if (currentSnapshot == null) {
       return ImmutableList.of();
     }
 
-    return currentSnapshot.dataManifests(table.io()).stream()
+    return currentSnapshot.allManifests(table.io()).stream()
+        .filter(manifest -> manifest.content() == content)

Review Comment:
   I restructured it a bit, no longer applies.



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkContentFiles.java:
##########
@@ -161,40 +173,132 @@ private void checkSparkDataFile(Table table) throws 
IOException {
 
     table.refresh();
 
+    PartitionSpec dataFilesSpec = table.spec();
+
     List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
 
     List<DataFile> dataFiles = Lists.newArrayList();
     try (ManifestReader<DataFile> reader = 
ManifestFiles.read(manifests.get(0), table.io())) {
       for (DataFile dataFile : reader) {
-        checkDataFile(dataFile.copy(), 
DataFiles.builder(table.spec()).copy(dataFile).build());
+        checkDataFile(dataFile.copy(), 
DataFiles.builder(dataFilesSpec).copy(dataFile).build());
         dataFiles.add(dataFile.copy());
       }
     }
 
-    Dataset<Row> dataFileDF = 
spark.read().format("iceberg").load(tableLocation + "#files");
+    UpdatePartitionSpec updateSpec = table.updateSpec();
+    for (PartitionField field : dataFilesSpec.fields()) {
+      updateSpec.removeField(field.name());
+    }
+    updateSpec.commit();
 
-    // reorder columns to test arbitrary projections
-    List<Column> columns =
-        
Arrays.stream(dataFileDF.columns()).map(ColumnName::new).collect(Collectors.toList());
-    Collections.shuffle(columns);
+    List<DeleteFile> positionDeleteFiles = Lists.newArrayList();
+    List<DeleteFile> equalityDeleteFiles = Lists.newArrayList();
+
+    RowDelta rowDelta = table.newRowDelta();
+
+    for (DataFile dataFile : dataFiles) {
+      DeleteFile positionDeleteFile = createPositionDeleteFile(table, 
dataFile);
+      positionDeleteFiles.add(positionDeleteFile);
+      rowDelta.addDeletes(positionDeleteFile);
+    }
 
-    List<Row> sparkDataFiles =
-        dataFileDF.select(Iterables.toArray(columns, 
Column.class)).collectAsList();
+    DeleteFile equalityDeleteFile1 = createEqualityDeleteFile(table);
+    equalityDeleteFiles.add(equalityDeleteFile1);
+    rowDelta.addDeletes(equalityDeleteFile1);
 
+    DeleteFile equalityDeleteFile2 = createEqualityDeleteFile(table);
+    equalityDeleteFiles.add(equalityDeleteFile2);
+    rowDelta.addDeletes(equalityDeleteFile2);
+
+    rowDelta.commit();
+
+    Dataset<Row> dataFileDF = 
spark.read().format("iceberg").load(tableLocation + "#data_files");
+    List<Row> sparkDataFiles = shuffleColumns(dataFileDF).collectAsList();
     Assert.assertEquals(
         "The number of files should match", dataFiles.size(), 
sparkDataFiles.size());
 
-    Types.StructType dataFileType = 
DataFile.getType(table.spec().partitionType());
+    Types.StructType dataFileType = 
DataFile.getType(dataFilesSpec.partitionType());
     StructType sparkDataFileType = sparkDataFiles.get(0).schema();
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkDataFileType);
+    SparkDataFile dataFileWrapper = new SparkDataFile(dataFileType, 
sparkDataFileType);
 
     for (int i = 0; i < dataFiles.size(); i++) {
-      checkDataFile(dataFiles.get(i), wrapper.wrap(sparkDataFiles.get(i)));
+      checkDataFile(dataFiles.get(i), 
dataFileWrapper.wrap(sparkDataFiles.get(i)));
+    }
+
+    Dataset<Row> positionDeleteFileDF =
+        spark.read().format("iceberg").load(tableLocation + 
"#delete_files").where("content = 1");
+    List<Row> sparkPositionDeleteFiles = 
shuffleColumns(positionDeleteFileDF).collectAsList();
+    Assert.assertEquals(
+        "The number of files should match",
+        positionDeleteFiles.size(),
+        sparkPositionDeleteFiles.size());
+
+    Types.StructType positionDeleteFileType = 
DataFile.getType(dataFilesSpec.partitionType());
+    StructType sparkPositionDeleteFileType = 
sparkPositionDeleteFiles.get(0).schema();
+    SparkDeleteFile positionDeleteFileWrapper =
+        new SparkDeleteFile(positionDeleteFileType, 
sparkPositionDeleteFileType);
+
+    for (int i = 0; i < positionDeleteFiles.size(); i++) {
+      checkDeleteFile(
+          positionDeleteFiles.get(i),
+          positionDeleteFileWrapper.wrap(sparkPositionDeleteFiles.get(i)));
+    }
+
+    Dataset<Row> equalityDeleteFileDF =
+        spark.read().format("iceberg").load(tableLocation + 
"#delete_files").where("content = 2");
+    List<Row> sparkEqualityDeleteFiles = 
shuffleColumns(equalityDeleteFileDF).collectAsList();
+    Assert.assertEquals(
+        "The number of files should match",
+        equalityDeleteFiles.size(),
+        sparkEqualityDeleteFiles.size());
+
+    Types.StructType equalityDeleteFileType = 
DataFile.getType(table.spec().partitionType());
+    StructType sparkEqualityDeleteFileType = 
sparkEqualityDeleteFiles.get(0).schema();
+    SparkDeleteFile equalityDeleteFileWrapper =
+        new SparkDeleteFile(equalityDeleteFileType, 
sparkEqualityDeleteFileType);
+
+    for (int i = 0; i < equalityDeleteFiles.size(); i++) {
+      checkDeleteFile(
+          equalityDeleteFiles.get(i),
+          equalityDeleteFileWrapper.wrap(sparkEqualityDeleteFiles.get(i)));
     }
   }
 
+  private Dataset<Row> shuffleColumns(Dataset<Row> df) {
+    List<Column> columns =
+        
Arrays.stream(df.columns()).map(ColumnName::new).collect(Collectors.toList());
+    Collections.shuffle(columns);
+    return df.select(columns.toArray(new Column[0]));
+  }
+
   private void checkDataFile(DataFile expected, DataFile actual) {
+    Assert.assertEquals("Content must match", expected.content(), 
actual.content());

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to