yihua commented on code in PR #18385:
URL: https://github.com/apache/hudi/pull/18385#discussion_r3035843226


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java:
##########
@@ -200,6 +207,107 @@ void loadDatasetWithNestedSchemaAndCoalesceAliases() 
throws IOException {
     Assertions.assertEquals(expectedSchema, result.get().schema(), "output 
dataset schema should match source schema");
   }
 
+  @Test
+  void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) {
+    String p1 = tempDir.resolve("part1").toString();
+    String p2 = tempDir.resolve("part2").toString();
+
+    StructType schema1 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("b", DataTypes.StringType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 
"x")), schema1)
+        .write().parquet(p1);
+
+    StructType schema2 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 
99)), schema2)
+        .write().parquet(p2);
+
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(new TypedProperties());
+    List<CloudObjectMetadata> input = Arrays.asList(
+        new CloudObjectMetadata(p1, 1L),
+        new CloudObjectMetadata(p2, 1L));
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", 
Option.empty(), 1);
+    Assertions.assertTrue(result.isPresent());
+    Assertions.assertEquals(2, result.get().count());
+    Set<String> colNames = 
Arrays.stream(result.get().schema().fields()).map(StructField::name).collect(Collectors.toSet());
+    Assertions.assertTrue(colNames.contains("b"));
+    Assertions.assertTrue(colNames.contains("c"));
+  }
+
+  /**
+   * With mergeSchema off, Spark does not union Parquet footers: it typically 
follows one file's schema
+   * (here {@code p1} is listed first). Columns only present in other files 
(e.g. {@code c} in {@code p2})
+   * are omitted from the scan—no exception, but data under those columns is 
dropped.
+   */
+  @Test
+  void parquetMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path 
tempDir) {
+    String p1 = tempDir.resolve("part1").toString();
+    String p2 = tempDir.resolve("part2").toString();
+
+    StructType schema1 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("b", DataTypes.StringType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 
"x")), schema1)
+        .write().parquet(p1);
+
+    StructType schema2 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 
99)), schema2)
+        .write().parquet(p2);
+
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA.key(),
 "false");
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(props);
+    List<CloudObjectMetadata> input = Arrays.asList(
+        new CloudObjectMetadata(p1, 1L),
+        new CloudObjectMetadata(p2, 1L));
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", 
Option.empty(), 1);
+    Assertions.assertTrue(result.isPresent());
+    Dataset<Row> ds = result.get();
+    Set<String> colNames = 
Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet());

Review Comment:
   🤖 The test asserts that `b` is present and `c` is absent, but Spark's schema 
selection across multiple Parquet files without `mergeSchema` is 
non-deterministic — it depends on which file footer Spark happens to read 
first, which can vary with Spark version or file-listing order. Could you make 
this test more deterministic? For example, test only that the result schema has 
exactly 2 columns (not 3), rather than asserting which specific column is 
present or absent.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java:
##########
@@ -200,6 +207,107 @@ void loadDatasetWithNestedSchemaAndCoalesceAliases() 
throws IOException {
     Assertions.assertEquals(expectedSchema, result.get().schema(), "output 
dataset schema should match source schema");
   }
 
+  @Test
+  void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) {
+    String p1 = tempDir.resolve("part1").toString();
+    String p2 = tempDir.resolve("part2").toString();
+
+    StructType schema1 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("b", DataTypes.StringType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 
"x")), schema1)
+        .write().parquet(p1);
+
+    StructType schema2 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 
99)), schema2)
+        .write().parquet(p2);
+
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(new TypedProperties());
+    List<CloudObjectMetadata> input = Arrays.asList(
+        new CloudObjectMetadata(p1, 1L),
+        new CloudObjectMetadata(p2, 1L));
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", 
Option.empty(), 1);
+    Assertions.assertTrue(result.isPresent());
+    Assertions.assertEquals(2, result.get().count());
+    Set<String> colNames = 
Arrays.stream(result.get().schema().fields()).map(StructField::name).collect(Collectors.toSet());
+    Assertions.assertTrue(colNames.contains("b"));
+    Assertions.assertTrue(colNames.contains("c"));
+  }
+
+  /**
+   * With mergeSchema off, Spark does not union Parquet footers: it typically 
follows one file's schema
+   * (here {@code p1} is listed first). Columns only present in other files 
(e.g. {@code c} in {@code p2})
+   * are omitted from the scan—no exception, but data under those columns is 
dropped.
+   */
+  @Test
+  void parquetMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path 
tempDir) {
+    String p1 = tempDir.resolve("part1").toString();
+    String p2 = tempDir.resolve("part2").toString();
+
+    StructType schema1 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("b", DataTypes.StringType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 
"x")), schema1)
+        .write().parquet(p1);
+
+    StructType schema2 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 
99)), schema2)
+        .write().parquet(p2);
+
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA.key(),
 "false");
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(props);
+    List<CloudObjectMetadata> input = Arrays.asList(
+        new CloudObjectMetadata(p1, 1L),
+        new CloudObjectMetadata(p2, 1L));
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", 
Option.empty(), 1);
+    Assertions.assertTrue(result.isPresent());
+    Dataset<Row> ds = result.get();
+    Set<String> colNames = 
Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet());
+    Assertions.assertTrue(colNames.contains("id"));
+    Assertions.assertTrue(colNames.contains("b"));
+    Assertions.assertFalse(colNames.contains("c"), "column c from second file 
should be absent without mergeSchema");
+    Assertions.assertEquals(2, ds.count());
+    List<Row> rows = ds.collectAsList();
+    Row fromSecondFile = rows.stream().filter(r -> r.getInt(0) == 
2).findFirst().orElseThrow();
+    
Assertions.assertTrue(fromSecondFile.isNullAt(fromSecondFile.fieldIndex("b")));
+  }
+
+  @Test
+  void parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir 
Path tempDir) {
+    String p1 = tempDir.resolve("part1").toString();
+    String p2 = tempDir.resolve("part2").toString();
+
+    StructType schema1 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("b", DataTypes.StringType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 
"x")), schema1)
+        .write().parquet(p1);
+
+    StructType schema2 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 
99)), schema2)
+        .write().parquet(p2);
+
+    TypedProperties props = new TypedProperties();
+    props.setProperty(CloudSourceConfig.SPARK_DATASOURCE_OPTIONS.key(), 
"{\"mergeSchema\":\"false\"}");
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(props);
+    List<CloudObjectMetadata> input = Arrays.asList(
+        new CloudObjectMetadata(p1, 1L),
+        new CloudObjectMetadata(p2, 1L));
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", 
Option.empty(), 1);
+    Assertions.assertTrue(result.isPresent());
+    Dataset<Row> ds = result.get();
+    Set<String> colNames = 
Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet());
+    Assertions.assertFalse(colNames.contains("c"));

Review Comment:
   🤖 Same non-determinism issue here as in the `mergeDisabled` test — 
`!colNames.contains("c")` assumes Spark picks p1's schema, but that's not 
guaranteed when `mergeSchema=false`. Would be more reliable to assert the 
schema has exactly 2 fields (or that `colNames.size() == 2`) instead of testing 
for the absence of a specific column name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to