yihua commented on code in PR #18379:
URL: https://github.com/apache/hudi/pull/18379#discussion_r3036287323


##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieAvroParquetConfigInjector.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.io.HoodieParquetConfigInjector;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieParquetConfigInjector} functionality in {@link 
HoodieAvroFileWriterFactory}.
+ */
+public class TestHoodieAvroParquetConfigInjector {
+
+  @TempDir
+  java.nio.file.Path tmpDir;
+
+  /**
+   * Test implementation that disables dictionary encoding.
+   */
+  public static class DisableDictionaryInjector implements 
HoodieParquetConfigInjector {

Review Comment:
   🤖 nit: `DisableDictionaryInjector` is defined identically in both test 
classes — could you consider extracting it to a shared test-fixtures location 
so the two copies don't drift?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieParquetConfigInjector.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieParquetConfigInjector;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.SparkDatasetTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieParquetConfigInjector} functionality in {@link 
HoodieSparkFileWriterFactory}.
+ */
+public class TestHoodieParquetConfigInjector extends HoodieClientTestBase {
+
+  /**
+   * Get HoodieSchema that matches SparkDatasetTestUtils.STRUCT_TYPE.
+   * This schema includes metadata fields and matches the structure of rows 
generated by getRandomRows().
+   */
+  private HoodieSchema getTestSchema() {
+    // Create Avro schema that matches SparkDatasetTestUtils.STRUCT_TYPE
+    // STRUCT_TYPE has: commit_time, commit_seqno, record_key, partition_path, 
filename,
+    //                  RECORD_KEY_FIELD_NAME, PARTITION_PATH_FIELD_NAME, 
randomInt, randomLong
+    String avroSchema = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"test_schema\","
+        + "\"namespace\":\"test.namespace\","
+        + "\"fields\":["
+        + "{\"name\":\"_hoodie_commit_time\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\"},"
+        + "{\"name\":\"key\",\"type\":\"string\"},"
+        + "{\"name\":\"partition\",\"type\":\"string\"},"
+        + "{\"name\":\"randomInt\",\"type\":\"int\"},"
+        + "{\"name\":\"randomLong\",\"type\":\"long\"}"
+        + "]}";
+    return HoodieSchema.parse(avroSchema);
+  }
+
+  /**
+   * Helper method to convert InternalRow (potentially UnsafeRow) to mutable 
GenericInternalRow.
+   * This is needed because writeRowWithMetadata() calls row.update() which 
doesn't work on UnsafeRow.
+   */
+  private List<InternalRow> toMutableRows(List<InternalRow> internalRows) {
+    List<InternalRow> mutableRows = new ArrayList<>();
+    for (InternalRow row : internalRows) {
+      // Convert to GenericInternalRow which supports update()
+      Object[] values = new Object[row.numFields()];
+      for (int i = 0; i < row.numFields(); i++) {
+        if (row.isNullAt(i)) {
+          values[i] = null;
+        } else {
+          values[i] = row.get(i, 
SparkDatasetTestUtils.STRUCT_TYPE.fields()[i].dataType());
+        }
+      }
+      mutableRows.add(new GenericInternalRow(values));
+    }
+    return mutableRows;
+  }
+
+  /**
+   * Test implementation that disables dictionary encoding.
+   */
+  public static class DisableDictionaryInjector implements 
HoodieParquetConfigInjector {
+    @Override
+    public Pair<StorageConfiguration, HoodieConfig> withProps(StoragePath path,
+                                                               
StorageConfiguration storageConf,
+                                                               HoodieConfig 
hoodieConfig) {
+      // Modify the Hudi config to disable dictionary encoding
+      hoodieConfig.setValue(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED, 
"false");
+      return Pair.of(storageConf, hoodieConfig);
+    }
+  }
+
+  @Test
+  public void testDisableDictionaryEncodingViaInjector() throws Exception {
+    final String instantTime = "100";
+    final StoragePath parquetPath = new StoragePath(
+        basePath + "/partition/path/test_dictionary_" + instantTime + 
".parquet");
+
+    // Use schema that matches SparkDatasetTestUtils.STRUCT_TYPE (includes 
metadata fields)

Review Comment:
   🤖 nit: `// Use schema that matches SparkDatasetTestUtils.STRUCT_TYPE 
(includes metadata fields)` appears before every `getTestSchema()` call — if 
the method name captured this intent (e.g. `getStructTypeCompatibleSchema()`), 
the repeated comment could be dropped entirely.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -217,10 +217,14 @@ private void consumeField(String field, int index, 
Runnable writer) {
   }
 
   private void writeFields(InternalRow row, StructType schema, ValueWriter[] 
fieldWriters) {
-    for (int i = 0; i < row.numFields(); i++) {
+    for (int i = 0; i < schema.fields().length; i++) {
       int index = i;
       if (!row.isNullAt(i)) {
-        consumeField(schema.fields()[i].name(), index, () -> 
fieldWriters[index].write(row, index));
+        try {

Review Comment:
   🤖 nit: this try/catch catches `ClassCastException` and immediately re-throws 
it without adding any message or wrapping — it's equivalent to having no 
try/catch at all. Could you either remove it or enrich the catch with a more 
descriptive exception message so readers understand why it's there?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java:
##########
@@ -57,16 +62,32 @@ protected HoodieFileWriter newParquetFileWriter(
     if (compressionCodecName.isEmpty()) {
       compressionCodecName = null;
     }
-    HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(storage.getConf(), schema,
-        config, enableBloomFilter(populateMetaFields, config));
+
+    String configInjectorClass = 
config.getStringOrDefault(HoodieStorageConfig.HOODIE_PARQUET_CONFIG_INJECTOR_CLASS,
 StringUtils.EMPTY_STRING);
+
+    StorageConfiguration storageConfiguration = storage.getConf();
+    HoodieConfig hoodieConfig = config;
+    if (!StringUtils.isNullOrEmpty(configInjectorClass)) {
+      try {
+        HoodieParquetConfigInjector injector = (HoodieParquetConfigInjector) 
ReflectionUtils.loadClass(configInjectorClass);
+        Pair<StorageConfiguration, HoodieConfig> modifiedConfigs = 
injector.withProps(path, storageConfiguration, hoodieConfig);
+        storageConfiguration = modifiedConfigs.getLeft();
+        hoodieConfig = modifiedConfigs.getRight();
+      } catch (Exception e) {
+        throw new HoodieException("Failed to instantiate or invoke parquet 
config injector class: " + configInjectorClass, e);
+      }
+    }
+
+    HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(storageConfiguration, schema,
+        hoodieConfig, enableBloomFilter(populateMetaFields, hoodieConfig));

Review Comment:
   🤖 nit: the explicit `(Configuration)` cast before 
`storageConfiguration.unwrapAs(Configuration.class)` is redundant — `unwrapAs` 
is already generic and returns `T` inferred from the class literal. The 
pre-existing call sites in this file didn't need the cast; could you drop it to 
keep things consistent?



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java:
##########
@@ -62,26 +67,43 @@ protected HoodieFileWriter newParquetFileWriter(
       String instantTime, StoragePath path, HoodieConfig config, HoodieSchema 
schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
     boolean populateMetaFields = 
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
-    HoodieAvroWriteSupport writeSupport = getHoodieAvroWriteSupport(schema, 
config, enableBloomFilter(populateMetaFields, config));
 
-    String compressionCodecName = 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
+    // Support for custom Parquet config injector
+    String configInjectorClass = 
config.getStringOrDefault(HoodieStorageConfig.HOODIE_PARQUET_CONFIG_INJECTOR_CLASS,
 StringUtils.EMPTY_STRING);
+

Review Comment:
   🤖 nit: this same 10-line injector-loading block (get the class name, guard 
on empty, reflectively load, call `withProps`, unpack the pair, wrap any 
exception) is copy-pasted identically into `HoodieAvroFileWriterFactory`, 
`HoodieSparkFileWriterFactory`, and `HoodieRowDataFileWriterFactory`. It might 
be worth extracting it into a static helper — either a default method on 
`HoodieParquetConfigInjector` or a small private utility — so there's one place 
to maintain it.



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieAvroParquetConfigInjector.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.io.HoodieParquetConfigInjector;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieParquetConfigInjector} functionality in {@link 
HoodieAvroFileWriterFactory}.
+ */
+public class TestHoodieAvroParquetConfigInjector {
+
+  @TempDir
+  java.nio.file.Path tmpDir;
+
+  /**
+   * Test implementation that disables dictionary encoding.
+   */
+  public static class DisableDictionaryInjector implements 
HoodieParquetConfigInjector {
+    @Override
+    public Pair<StorageConfiguration, HoodieConfig> withProps(StoragePath path,
+                                                               
StorageConfiguration storageConf,
+                                                               HoodieConfig 
hoodieConfig) {
+      // Modify the Hudi config to disable dictionary encoding
+      hoodieConfig.setValue(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED, 
"false");
+      return Pair.of(storageConf, hoodieConfig);
+    }
+  }
+
+  @Test
+  public void testDisableDictionaryEncodingViaInjector() throws Exception {
+    final String instantTime = "100";
+    HoodieStorage storage = HoodieTestUtils.getStorage(tmpDir.toString());
+    final StoragePath parquetPath = new StoragePath(
+        tmpDir.resolve("test_dictionary_" + instantTime + 
".parquet").toAbsolutePath().toString());
+
+    // Generate test data
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+    List<GenericRecord> records = dataGen.generateGenericRecords(100);
+    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(records.get(0).getSchema());
+
+    // Create config with the custom injector
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED, "true"); 
// Start with dictionary enabled
+    config.setValue(HoodieStorageConfig.HOODIE_PARQUET_CONFIG_INJECTOR_CLASS, 
DisableDictionaryInjector.class.getName());
+
+    // Create writer and write some data
+    HoodieFileWriter writer = new HoodieAvroFileWriterFactory(storage)
+        .newParquetFileWriter(instantTime, parquetPath, config, schema, new 
LocalTaskContextSupplier());
+
+    assertTrue(writer instanceof HoodieAvroParquetWriter);
+
+    // Write test records
+    HoodieAvroParquetWriter avroWriter = (HoodieAvroParquetWriter) writer;
+    for (GenericRecord record : records) {
+      avroWriter.writeAvro((String) record.get("_row_key"), record);
+    }
+    writer.close();
+
+    // Verify the parquet file was created
+    assertTrue(storage.exists(parquetPath));
+
+    // Read parquet metadata and verify dictionary encoding is disabled
+    Configuration hadoopConf = new Configuration();
+    org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(parquetPath.toUri());

Review Comment:
   🤖 nit: `org.apache.hadoop.fs.Path` is used fully-qualified twice here — 
worth adding it as an import alongside `Configuration` to keep things 
consistent.



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieAvroParquetConfigInjector.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.io.HoodieParquetConfigInjector;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieParquetConfigInjector} functionality in {@link 
HoodieAvroFileWriterFactory}.
+ */
+public class TestHoodieAvroParquetConfigInjector {
+
+  @TempDir
+  java.nio.file.Path tmpDir;

Review Comment:
   🤖 nit: could you add `import java.nio.file.Path;` and use just `Path tmpDir` 
here? The fully-qualified name makes the field declaration a bit harder to scan 
at a glance.



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieAvroParquetConfigInjector.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.io.HoodieParquetConfigInjector;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieParquetConfigInjector} functionality in {@link 
HoodieAvroFileWriterFactory}.
+ */
+public class TestHoodieAvroParquetConfigInjector {
+
+  @TempDir
+  java.nio.file.Path tmpDir;
+
+  /**
+   * Test implementation that disables dictionary encoding.
+   */
+  public static class DisableDictionaryInjector implements 
HoodieParquetConfigInjector {
+    @Override
+    public Pair<StorageConfiguration, HoodieConfig> withProps(StoragePath path,
+                                                               
StorageConfiguration storageConf,
+                                                               HoodieConfig 
hoodieConfig) {
+      // Modify the Hudi config to disable dictionary encoding

Review Comment:
   🤖 nit: this comment restates what the code already says — 
`setValue(PARQUET_DICTIONARY_ENABLED, "false")` is self-evident in the context 
of `DisableDictionaryInjector`. Could probably drop it (and the identical one 
in the Spark test).



##########
hudi-common/src/main/java/org/apache/hudi/io/HoodieParquetConfigInjector.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+/**
+ * A pluggable interface that all parquet-based writers (Spark/Flink) will 
invoke before creating write support
+ * or parquet file writer objects.
+ * <p>
+ * This allows users to inject custom configurations into the Parquet writer 
pipeline at runtime, enabling
+ * fine-grained control over Parquet file properties such as bloom filters, 
compression settings, encoding
+ * options, and other advanced Parquet configurations.
+ * <p>
+ * Implementations of this interface can modify both the storage configuration 
(e.g., Hadoop Configuration)
+ * and the Hudi-specific configuration before the Parquet writer is created.
+ * <p>
+ * Example use cases:
+ * <ul>
+ *   <li>Enabling column-specific Parquet bloom filters</li>
+ *   <li>Setting custom compression codecs per file or partition</li>
+ *   <li>Adjusting page sizes or row group sizes based on data 
characteristics</li>
+ *   <li>Injecting custom metadata into Parquet files</li>
+ * </ul>
+ *
+ * @since 1.2.0
+ */
+public interface HoodieParquetConfigInjector {
+
+  /**
+   * Injects custom configurations into the Parquet writer pipeline.
+   * <p>
+   * This method is invoked before creating the Parquet write support and 
writer objects, allowing
+   * implementations to modify both the storage-level and Hudi-level 
configurations.
+   *
+   * @param path the file path where the Parquet file will be written
+   * @param storageConf the storage configuration (e.g., Hadoop Configuration) 
that will be used by the writer
+   * @param hoodieConfig the Hudi configuration containing write settings and 
table properties
+   * @return a pair containing the potentially modified storage configuration 
and Hudi configuration.
+   *         Both configurations will be used to create the Parquet writer.

Review Comment:
   🤖 nit: `withProps` doesn't quite read as a verb describing what the method 
does — it feels more like a builder setter name. Something like `injectConfig` 
or `modifyConfig` might make the intent clearer at the call sites.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieParquetConfigInjector.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieParquetConfigInjector;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.SparkDatasetTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieParquetConfigInjector} functionality in {@link 
HoodieSparkFileWriterFactory}.
+ */
+public class TestHoodieParquetConfigInjector extends HoodieClientTestBase {
+
+  /**
+   * Get HoodieSchema that matches SparkDatasetTestUtils.STRUCT_TYPE.
+   * This schema includes metadata fields and matches the structure of rows 
generated by getRandomRows().
+   */
+  private HoodieSchema getTestSchema() {
+    // Create Avro schema that matches SparkDatasetTestUtils.STRUCT_TYPE
+    // STRUCT_TYPE has: commit_time, commit_seqno, record_key, partition_path, 
filename,
+    //                  RECORD_KEY_FIELD_NAME, PARTITION_PATH_FIELD_NAME, 
randomInt, randomLong
+    String avroSchema = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"test_schema\","
+        + "\"namespace\":\"test.namespace\","
+        + "\"fields\":["
+        + "{\"name\":\"_hoodie_commit_time\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\"},"
+        + "{\"name\":\"key\",\"type\":\"string\"},"
+        + "{\"name\":\"partition\",\"type\":\"string\"},"
+        + "{\"name\":\"randomInt\",\"type\":\"int\"},"
+        + "{\"name\":\"randomLong\",\"type\":\"long\"}"
+        + "]}";
+    return HoodieSchema.parse(avroSchema);
+  }
+
+  /**
+   * Helper method to convert InternalRow (potentially UnsafeRow) to mutable 
GenericInternalRow.
+   * This is needed because writeRowWithMetadata() calls row.update() which 
doesn't work on UnsafeRow.
+   */
+  private List<InternalRow> toMutableRows(List<InternalRow> internalRows) {
+    List<InternalRow> mutableRows = new ArrayList<>();
+    for (InternalRow row : internalRows) {
+      // Convert to GenericInternalRow which supports update()

Review Comment:
   🤖 nit: the inline comment `// Convert to GenericInternalRow which supports 
update()` repeats what the method-level Javadoc already explains — could drop 
it to keep the body clean.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieParquetConfigInjector.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieParquetConfigInjector;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.SparkDatasetTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieParquetConfigInjector} functionality in {@link 
HoodieSparkFileWriterFactory}.
+ */
+public class TestHoodieParquetConfigInjector extends HoodieClientTestBase {
+
+  /**
+   * Get HoodieSchema that matches SparkDatasetTestUtils.STRUCT_TYPE.
+   * This schema includes metadata fields and matches the structure of rows 
generated by getRandomRows().
+   */
+  private HoodieSchema getTestSchema() {
+    // Create Avro schema that matches SparkDatasetTestUtils.STRUCT_TYPE
+    // STRUCT_TYPE has: commit_time, commit_seqno, record_key, partition_path, 
filename,
+    //                  RECORD_KEY_FIELD_NAME, PARTITION_PATH_FIELD_NAME, 
randomInt, randomLong
+    String avroSchema = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"test_schema\","
+        + "\"namespace\":\"test.namespace\","
+        + "\"fields\":["
+        + "{\"name\":\"_hoodie_commit_time\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\"},"
+        + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\"},"
+        + "{\"name\":\"key\",\"type\":\"string\"},"
+        + "{\"name\":\"partition\",\"type\":\"string\"},"
+        + "{\"name\":\"randomInt\",\"type\":\"int\"},"
+        + "{\"name\":\"randomLong\",\"type\":\"long\"}"
+        + "]}";
+    return HoodieSchema.parse(avroSchema);
+  }
+
+  /**
+   * Helper method to convert InternalRow (potentially UnsafeRow) to mutable 
GenericInternalRow.
+   * This is needed because writeRowWithMetadata() calls row.update() which 
doesn't work on UnsafeRow.
+   */
+  private List<InternalRow> toMutableRows(List<InternalRow> internalRows) {
+    List<InternalRow> mutableRows = new ArrayList<>();
+    for (InternalRow row : internalRows) {
+      // Convert to GenericInternalRow which supports update()
+      Object[] values = new Object[row.numFields()];
+      for (int i = 0; i < row.numFields(); i++) {
+        if (row.isNullAt(i)) {
+          values[i] = null;
+        } else {
+          values[i] = row.get(i, 
SparkDatasetTestUtils.STRUCT_TYPE.fields()[i].dataType());
+        }
+      }
+      mutableRows.add(new GenericInternalRow(values));
+    }
+    return mutableRows;
+  }
+
+  /**
+   * Test implementation that disables dictionary encoding.
+   */
+  public static class DisableDictionaryInjector implements 
HoodieParquetConfigInjector {
+    @Override
+    public Pair<StorageConfiguration, HoodieConfig> withProps(StoragePath path,
+                                                               
StorageConfiguration storageConf,
+                                                               HoodieConfig 
hoodieConfig) {
+      // Modify the Hudi config to disable dictionary encoding
+      hoodieConfig.setValue(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED, 
"false");
+      return Pair.of(storageConf, hoodieConfig);
+    }
+  }
+
+  @Test
+  public void testDisableDictionaryEncodingViaInjector() throws Exception {
+    final String instantTime = "100";
+    final StoragePath parquetPath = new StoragePath(
+        basePath + "/partition/path/test_dictionary_" + instantTime + 
".parquet");
+
+    // Use schema that matches SparkDatasetTestUtils.STRUCT_TYPE (includes 
metadata fields)
+    HoodieSchema schema = getTestSchema();
+
+    // Create config with the custom injector
+    final HoodieWriteConfig cfg = getConfigBuilder()
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            
.withParquetConfigInjectorClass(DisableDictionaryInjector.class.getName())
+            .parquetDictionaryEnabled(true) // Start with dictionary enabled
+            .build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
+
+    // Create writer and write some data
+    HoodieFileWriter writer = HoodieFileWriterFactory.getFileWriter(
+        instantTime, parquetPath, table.getStorage(), cfg.getStorageConfig(),
+        schema, supplier, HoodieRecordType.SPARK);
+
+    assertTrue(writer instanceof HoodieSparkParquetWriter);
+
+    // Generate test data using SparkDatasetTestUtils and convert to 
InternalRows
+    Dataset<Row> rowDataset = SparkDatasetTestUtils.getRandomRows(sqlContext, 
100, "partition/path", false);
+    List<InternalRow> internalRows = 
toMutableRows(SparkDatasetTestUtils.toInternalRows(rowDataset, 
SparkDatasetTestUtils.ENCODER));
+    List<Row> rows = rowDataset.collectAsList();
+
+    // Write some test records
+    HoodieSparkParquetWriter sparkWriter = (HoodieSparkParquetWriter) writer;
+    for (int i = 0; i < internalRows.size(); i++) {
+      InternalRow row = internalRows.get(i);
+      String recordKey = rows.get(i).getString(2); // record_key is at index 2 
(after commit_time and seq_no)
+      String partition = rows.get(i).getString(3);  // partition is at index 3
+      sparkWriter.writeRowWithMetadata(new HoodieKey(recordKey, partition), 
row);
+    }
+    writer.close();
+
+    // Verify the parquet file was created
+    assertTrue(table.getStorage().exists(parquetPath));
+
+    // Read parquet metadata and verify dictionary encoding is disabled
+    Configuration hadoopConf = new Configuration();
+    org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(parquetPath.toUri());

Review Comment:
   🤖 nit: same as the Hadoop test — `org.apache.hadoop.fs.Path` is 
fully-qualified here while `Configuration` is imported. Adding the import would 
make the line a lot easier to read.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java:
##########
@@ -581,6 +593,17 @@ public Builder withBloomFilterDynamicMaxEntries(int 
maxEntries) {
       return this;
     }
 
+    /**
+     * Sets the parquet config injector class name.
+     *
+     * @param parquetConfigInjectorClass The fully-qualified class name of the 
parquet config injector
+     * @return this builder instance for method chaining
+     */

Review Comment:
   🤖 nit: the `@return this builder instance for method chaining` line is 
redundant — that's implied by every builder method. The other builder methods 
in this class don't carry this tag; could you drop it to keep things consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to