szehon-ho commented on code in PR #7920:
URL: https://github.com/apache/iceberg/pull/7920#discussion_r1254749896


##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -2028,4 +2055,32 @@ public static Dataset<Row> selectNonDerived(Dataset<Row> 
metadataTable) {
   public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
     return 
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
   }
+
+  private long sumDataFileSizeInBytes(Iterable<DataFile> dataFiles) {
+    return StreamSupport.stream(dataFiles.spliterator(), false)
+        .mapToLong(DataFile::fileSizeInBytes)
+        .sum();
+  }
+
+  private List<DataFile> listDataFilesFromCommitId(Table table, long commitId) 
{
+    return StreamSupport.stream(

Review Comment:
   Could this be (to be shorter)
   ```Lists.newArrayList(table.snapshot(commitId).addedDataFiles(table.io())```



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -2028,4 +2055,32 @@ public static Dataset<Row> selectNonDerived(Dataset<Row> 
metadataTable) {
   public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
     return 
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
   }
+
+  private long sumDataFileSizeInBytes(Iterable<DataFile> dataFiles) {
+    return StreamSupport.stream(dataFiles.spliterator(), false)
+        .mapToLong(DataFile::fileSizeInBytes)
+        .sum();
+  }
+
+  private List<DataFile> listDataFilesFromCommitId(Table table, long commitId) 
{
+    return StreamSupport.stream(
+            table.snapshot(commitId).addedDataFiles(table.io()).spliterator(), 
false)
+        .collect(Collectors.toList());
+  }
+
+  private void assertPartitionIdFromDataFiles(
+      List<DataFile> dataFilesFromCommit,
+      int expectedDataFileCount,
+      List<Integer> expectedPartitionIds) {
+    Assert.assertEquals(
+        "Commit should have " + expectedDataFileCount + " data files",
+        expectedDataFileCount,
+        dataFilesFromCommit.size());
+    for (int i = 0; i < expectedDataFileCount; ++i) {
+      Assert.assertEquals(
+          "Data file belong to partition of id " + expectedPartitionIds.get(i),

Review Comment:
   Nit: I think this shows when it does not equal, so could be 'data file 
should have partition of id'



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -2028,4 +2055,32 @@ public static Dataset<Row> selectNonDerived(Dataset<Row> 
metadataTable) {
   public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
     return 
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
   }
+
+  private long sumDataFileSizeInBytes(Iterable<DataFile> dataFiles) {
+    return StreamSupport.stream(dataFiles.spliterator(), false)
+        .mapToLong(DataFile::fileSizeInBytes)
+        .sum();
+  }
+
+  private List<DataFile> listDataFilesFromCommitId(Table table, long commitId) 
{

Review Comment:
   Optional: maybe we can make it shorter  like `dataFiles(table, commitId)`  
as commitId is already in the argument?  



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1469,12 +1483,20 @@ public void testPartitionsTableLastUpdatedSnapshot() {
         new GenericRecordBuilder(
             AvroSchemaUtil.convert(
                 partitionsTable.schema().findType("partition").asStructType(), 
"partition"));
+
+    List<DataFile> dataFilesFromFirstCommit = listDataFilesFromCommitId(table, 
firstCommitId);
+    assertPartitionIdFromDataFiles(dataFilesFromFirstCommit, 2, 
Arrays.asList(1, 2));

Review Comment:
   Nit: Could we make it take array to be shorter?  Should be pretty small 
change in the method, right?



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -2028,4 +2055,32 @@ public static Dataset<Row> selectNonDerived(Dataset<Row> 
metadataTable) {
   public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
     return 
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
   }
+
+  private long sumDataFileSizeInBytes(Iterable<DataFile> dataFiles) {
+    return StreamSupport.stream(dataFiles.spliterator(), false)
+        .mapToLong(DataFile::fileSizeInBytes)
+        .sum();
+  }
+
+  private List<DataFile> listDataFilesFromCommitId(Table table, long commitId) 
{
+    return StreamSupport.stream(

Review Comment:
   Or we can not have this method and instead use 
https://github.com/apache/iceberg/blob/master/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java#L682
 as in the other comment.



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1469,12 +1483,20 @@ public void testPartitionsTableLastUpdatedSnapshot() {
         new GenericRecordBuilder(
             AvroSchemaUtil.convert(
                 partitionsTable.schema().findType("partition").asStructType(), 
"partition"));
+
+    List<DataFile> dataFilesFromFirstCommit = listDataFilesFromCommitId(table, 
firstCommitId);

Review Comment:
   Would it work to make a method List<DataFile> dataFiles(table) to get all 
the data files, so we don't have to do add data files from both commits?
   
   I did this before here: 
https://github.com/apache/iceberg/blob/master/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java#L682
   
   (I think we can do it without column stats here).
   
   If we do this, we can even extract to TestHelpers in a later PR.



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -2028,4 +2055,32 @@ public static Dataset<Row> selectNonDerived(Dataset<Row> 
metadataTable) {
   public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
     return 
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
   }
+
+  private long sumDataFileSizeInBytes(Iterable<DataFile> dataFiles) {
+    return StreamSupport.stream(dataFiles.spliterator(), false)
+        .mapToLong(DataFile::fileSizeInBytes)
+        .sum();
+  }
+
+  private List<DataFile> listDataFilesFromCommitId(Table table, long commitId) 
{
+    return StreamSupport.stream(
+            table.snapshot(commitId).addedDataFiles(table.io()).spliterator(), 
false)
+        .collect(Collectors.toList());
+  }
+
+  private void assertPartitionIdFromDataFiles(

Review Comment:
   Optional:  assertDataFilePartitions
   
   (partition id is a bit ambiguious)



##########
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -2028,4 +2055,32 @@ public static Dataset<Row> selectNonDerived(Dataset<Row> 
metadataTable) {
   public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
     return 
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
   }
+
+  private long sumDataFileSizeInBytes(Iterable<DataFile> dataFiles) {

Review Comment:
   Also optional: maybe just `totalSizeInBytes(Iterable<DataFile> dataFiles)`, 
like below comment as dataFiles is already in the argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to