yihua commented on code in PR #18385:
URL: https://github.com/apache/hudi/pull/18385#discussion_r3035843226
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java:
##########
@@ -200,6 +207,107 @@ void loadDatasetWithNestedSchemaAndCoalesceAliases()
throws IOException {
Assertions.assertEquals(expectedSchema, result.get().schema(), "output
dataset schema should match source schema");
}
+ @Test
+ void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) {
+ String p1 = tempDir.resolve("part1").toString();
+ String p2 = tempDir.resolve("part2").toString();
+
+ StructType schema1 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("b", DataTypes.StringType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1,
"x")), schema1)
+ .write().parquet(p1);
+
+ StructType schema2 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1,
99)), schema2)
+ .write().parquet(p2);
+
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(new TypedProperties());
+ List<CloudObjectMetadata> input = Arrays.asList(
+ new CloudObjectMetadata(p1, 1L),
+ new CloudObjectMetadata(p2, 1L));
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet",
Option.empty(), 1);
+ Assertions.assertTrue(result.isPresent());
+ Assertions.assertEquals(2, result.get().count());
+ Set<String> colNames =
Arrays.stream(result.get().schema().fields()).map(StructField::name).collect(Collectors.toSet());
+ Assertions.assertTrue(colNames.contains("b"));
+ Assertions.assertTrue(colNames.contains("c"));
+ }
+
+ /**
+ * With mergeSchema off, Spark does not union Parquet footers: it typically
follows one file's schema
+ * (here {@code p1} is listed first). Columns only present in other files
(e.g. {@code c} in {@code p2})
+ * are omitted from the scan—no exception, but data under those columns is
dropped.
+ */
+ @Test
+ void parquetMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path
tempDir) {
+ String p1 = tempDir.resolve("part1").toString();
+ String p2 = tempDir.resolve("part2").toString();
+
+ StructType schema1 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("b", DataTypes.StringType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1,
"x")), schema1)
+ .write().parquet(p1);
+
+ StructType schema2 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2,
99)), schema2)
+ .write().parquet(p2);
+
+ TypedProperties props = new TypedProperties();
+
props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA.key(),
"false");
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(props);
+ List<CloudObjectMetadata> input = Arrays.asList(
+ new CloudObjectMetadata(p1, 1L),
+ new CloudObjectMetadata(p2, 1L));
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet",
Option.empty(), 1);
+ Assertions.assertTrue(result.isPresent());
+ Dataset<Row> ds = result.get();
+ Set<String> colNames =
Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet());
Review Comment:
🤖 The test asserts that `b` is present and `c` is absent, but Spark's schema
selection across multiple Parquet files without `mergeSchema` is
non-deterministic — it depends on which file footer Spark happens to read
first, which can vary with Spark version or file-listing order. Could you make
this test more deterministic? For example, test only that the result schema has
exactly 2 columns (not 3), rather than asserting which specific column is
present or absent.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java:
##########
@@ -200,6 +207,107 @@ void loadDatasetWithNestedSchemaAndCoalesceAliases()
throws IOException {
Assertions.assertEquals(expectedSchema, result.get().schema(), "output
dataset schema should match source schema");
}
+ @Test
+ void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) {
+ String p1 = tempDir.resolve("part1").toString();
+ String p2 = tempDir.resolve("part2").toString();
+
+ StructType schema1 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("b", DataTypes.StringType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1,
"x")), schema1)
+ .write().parquet(p1);
+
+ StructType schema2 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1,
99)), schema2)
+ .write().parquet(p2);
+
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(new TypedProperties());
+ List<CloudObjectMetadata> input = Arrays.asList(
+ new CloudObjectMetadata(p1, 1L),
+ new CloudObjectMetadata(p2, 1L));
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet",
Option.empty(), 1);
+ Assertions.assertTrue(result.isPresent());
+ Assertions.assertEquals(2, result.get().count());
+ Set<String> colNames =
Arrays.stream(result.get().schema().fields()).map(StructField::name).collect(Collectors.toSet());
+ Assertions.assertTrue(colNames.contains("b"));
+ Assertions.assertTrue(colNames.contains("c"));
+ }
+
+ /**
+ * With mergeSchema off, Spark does not union Parquet footers: it typically
follows one file's schema
+ * (here {@code p1} is listed first). Columns only present in other files
(e.g. {@code c} in {@code p2})
+ * are omitted from the scan—no exception, but data under those columns is
dropped.
+ */
+ @Test
+ void parquetMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path
tempDir) {
+ String p1 = tempDir.resolve("part1").toString();
+ String p2 = tempDir.resolve("part2").toString();
+
+ StructType schema1 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("b", DataTypes.StringType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1,
"x")), schema1)
+ .write().parquet(p1);
+
+ StructType schema2 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2,
99)), schema2)
+ .write().parquet(p2);
+
+ TypedProperties props = new TypedProperties();
+
props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA.key(),
"false");
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(props);
+ List<CloudObjectMetadata> input = Arrays.asList(
+ new CloudObjectMetadata(p1, 1L),
+ new CloudObjectMetadata(p2, 1L));
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet",
Option.empty(), 1);
+ Assertions.assertTrue(result.isPresent());
+ Dataset<Row> ds = result.get();
+ Set<String> colNames =
Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet());
+ Assertions.assertTrue(colNames.contains("id"));
+ Assertions.assertTrue(colNames.contains("b"));
+ Assertions.assertFalse(colNames.contains("c"), "column c from second file
should be absent without mergeSchema");
+ Assertions.assertEquals(2, ds.count());
+ List<Row> rows = ds.collectAsList();
+ Row fromSecondFile = rows.stream().filter(r -> r.getInt(0) ==
2).findFirst().orElseThrow();
+
Assertions.assertTrue(fromSecondFile.isNullAt(fromSecondFile.fieldIndex("b")));
+ }
+
+ @Test
+ void parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir
Path tempDir) {
+ String p1 = tempDir.resolve("part1").toString();
+ String p2 = tempDir.resolve("part2").toString();
+
+ StructType schema1 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("b", DataTypes.StringType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1,
"x")), schema1)
+ .write().parquet(p1);
+
+ StructType schema2 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2,
99)), schema2)
+ .write().parquet(p2);
+
+ TypedProperties props = new TypedProperties();
+ props.setProperty(CloudSourceConfig.SPARK_DATASOURCE_OPTIONS.key(),
"{\"mergeSchema\":\"false\"}");
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(props);
+ List<CloudObjectMetadata> input = Arrays.asList(
+ new CloudObjectMetadata(p1, 1L),
+ new CloudObjectMetadata(p2, 1L));
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet",
Option.empty(), 1);
+ Assertions.assertTrue(result.isPresent());
+ Dataset<Row> ds = result.get();
+ Set<String> colNames =
Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet());
+ Assertions.assertFalse(colNames.contains("c"));
Review Comment:
🤖 Same non-determinism issue here as in the `mergeDisabled` test —
`!colNames.contains("c")` assumes Spark picks p1's schema, but that's not
guaranteed when `mergeSchema=false`. Would be more reliable to assert the
schema has exactly 2 fields (or that `colNames.size() == 2`) instead of testing
for the absence of a specific column name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]