yihua commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3036299834


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+/**
+ * Spark/HoodieStreamer implementation of {@link ValidationContext}.
+ *
+ * <p>Constructed from data available in {@code 
StreamSync.writeToSinkAndDoMetaSync()}
+ * before the commit is finalized. Provides validators with access to commit 
metadata,
+ * write statistics, and previous commit information for streaming offset 
validation.</p>
+ *
+ * <p>Unlike Flink's implementation, Spark can optionally provide active 
timeline access
+ * via {@link HoodieTableMetaClient} for richer validation patterns.</p>
+ */
+public class SparkValidationContext implements ValidationContext {
+
+  private final String instantTime;
+  private final Option<HoodieCommitMetadata> commitMetadata;
+  private final Option<List<HoodieWriteStat>> writeStats;
+  private final Option<HoodieCommitMetadata> previousCommitMetadata;
+  private final HoodieTableMetaClient metaClient;
+
+  /**
+   * Create a Spark validation context with full timeline access.
+   *
+   * @param instantTime Current commit instant time
+   * @param commitMetadata Current commit metadata (with extraMetadata 
including checkpoints)
+   * @param writeStats Write statistics from write operations
+   * @param previousCommitMetadata Metadata from the previous completed commit
+   * @param metaClient Table meta client for timeline access (may be null for 
testing)
+   */
+  public SparkValidationContext(String instantTime,
+                                Option<HoodieCommitMetadata> commitMetadata,
+                                Option<List<HoodieWriteStat>> writeStats,
+                                Option<HoodieCommitMetadata> 
previousCommitMetadata,
+                                HoodieTableMetaClient metaClient) {
+    this.instantTime = instantTime;
+    this.commitMetadata = commitMetadata;
+    this.writeStats = writeStats;
+    this.previousCommitMetadata = previousCommitMetadata;
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Create a Spark validation context without timeline access (for testing).
+   *
+   * @param instantTime Current commit instant time
+   * @param commitMetadata Current commit metadata (with extraMetadata 
including checkpoints)
+   * @param writeStats Write statistics from write operations
+   * @param previousCommitMetadata Metadata from the previous completed commit
+   */
+  public SparkValidationContext(String instantTime,
+                                Option<HoodieCommitMetadata> commitMetadata,
+                                Option<List<HoodieWriteStat>> writeStats,
+                                Option<HoodieCommitMetadata> 
previousCommitMetadata) {
+    this(instantTime, commitMetadata, writeStats, previousCommitMetadata, 
null);
+  }

Review Comment:
   _⚠️ Potential issue_ | _🟠 Major_
   
   <details>
   <summary>🧩 Analysis chain</summary>
   
   🏁 Script executed:
   
   ```shell
   git ls-files | grep -E "(SparkValidationContext|ValidationContext)" | head 
-20
   ```
   
   Repository: yihua/hudi
   
   Length of output: 559
   
   ---
   
   🏁 Script executed:
   
   ```shell
   find . -name "SparkValidationContext.java" -o -name "ValidationContext.java"
   ```
   
   Repository: yihua/hudi
   
   Length of output: 241
   
   ---
   
   🏁 Script executed:
   
   ```shell
   cat -n 
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 6346
   
   ---
   
   🏁 Script executed:
   
   ```shell
   # Search for ValidationContext interface definition
   find . -name "ValidationContext.java" -type f
   ```
   
   Repository: yihua/hudi
   
   Length of output: 137
   
   ---
   
   🏁 Script executed:
   
   ```shell
   # Find all references to getActiveTimeline and getPreviousCommitInstant
   rg "getActiveTimeline|getPreviousCommitInstant" --type java -B 2 -A 2
   ```
   
   Repository: yihua/hudi
   
   Length of output: 50367
   
   ---
   
   🏁 Script executed:
   
   ```shell
   cat -n 
hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 8437
   
   ---
   
   🏁 Script executed:
   
   ```shell
   # Search for usages of ValidationContext in streaming validators
   rg "ValidationContext" hudi-utilities/src -A 3 -B 1 | head -100
   ```
   
   Repository: yihua/hudi
   
   Length of output: 12271
   
   ---
   
   🏁 Script executed:
   
   ```shell
   # Check if there are other implementations of ValidationContext
   rg "implements ValidationContext" --type java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 583
   
   ---
   
   🏁 Script executed:
   
   ```shell
   # Search for uses of SparkValidationContext in the actual streaming validator
   find . -name "StreamingOffsetValidator.java" -o -name "*StreamingValidator*" 
| head -5
   ```
   
   Repository: yihua/hudi
   
   Length of output: 163
   
   ---
   
   🏁 Script executed:
   
   ```shell
   cat -n 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 11783
   
   ---
   
   🏁 Script executed:
   
   ```shell
   # Check FlinkValidationContext to see if it has similar issues
   cat -n 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidationContext.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 5248
   
   ---
   
   🏁 Script executed:
   
   ```shell
   # Check TestSparkValidationContext to see what's being tested
   cat -n 
hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 6625
   
   ---
   
   
   
   </details>
   
   **Clarify API contract boundaries for `getActiveTimeline()` and 
`getPreviousCommitInstant()`.**
   
   Both `getActiveTimeline()` and `getPreviousCommitInstant()` throw 
`UnsupportedOperationException` in this implementation, following the same 
pattern as `FlinkValidationContext`. This is by designβ€”validators must use 
overridden convenience methods (`isFirstCommit()`) and the 
`previousCommitMetadata` field instead.
   
   However, the public 4-arg constructor (lines 79–84) claims to be "for 
testing" but has no `@VisibleForTesting` annotation, allowing production code 
to instantiate a partially initialized context. Either enforce the testing-only 
scope via annotations or document clearly that `getActiveTimeline()` will 
always throw when `metaClient` is absent. Current validators like 
`StreamingOffsetValidator` correctly avoid these methods, but the interface 
makes the constraint implicit rather than explicit.
   
   <details>
   <summary>πŸ€– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java`
   around lines 79 - 84, The public 4-arg SparkValidationContext constructor
   currently allows production code to create a context without a MetaClient 
even
   though getActiveTimeline() and getPreviousCommitInstant() always throw; 
update
   the API contract by marking that constructor as testing-only (annotate the
   constructor SparkValidationContext(String, Option<HoodieCommitMetadata>,
   Option<List<HoodieWriteStat>>, Option<HoodieCommitMetadata>) with
   `@VisibleForTesting`) and add a short JavaDoc on the constructor explaining 
that
   getActiveTimeline() and getPreviousCommitInstant() will throw
   UnsupportedOperationException unless metaClient is provided and validators
   should use isFirstCommit() and previousCommitMetadata instead; alternatively,
   make the constructor non-public (private/protected) if you want to prevent
   external use in productionβ€”apply the chosen change to SparkValidationContext
   only.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:grasshopper:0f652f5b-b6d9-447a-9225-607fc1df25ee 
-->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   β€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/15#discussion_r3036299758)) 
(source:comment#3036299758)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Cache the RDD to avoid recomputation when collecting write stats 
(prevents a second DAG evaluation).
+    // Always unpersist in finally to prevent executor memory leaks.
+    writeStatusRDD.cache();
+    try {
+      List<WriteStatus> allWriteStatus = writeStatusRDD.collect();
+      HoodieCommitMetadata currentMetadata = 
buildCommitMetadata(allWriteStatus, checkpointCommitMetadata);
+      List<HoodieWriteStat> writeStats = allWriteStatus.stream()
+          .map(WriteStatus::getStat)
+          .collect(Collectors.toList());

Review Comment:
   _⚠️ Potential issue_ | _🟠 Major_
   
   **Filter null `WriteStatus#getStat()` before building validation context.**
   
   `buildCommitMetadata(...)` already guards null stats, but `writeStats` 
currently does not. A null stat can propagate and fail later during validator 
aggregation.
   
   
   <details>
   <summary>πŸ› οΈ Suggested fix</summary>
   
   ```diff
    import java.util.Map;
   +import java.util.Objects;
    import java.util.stream.Collectors;
   @@
          List<HoodieWriteStat> writeStats = allWriteStatus.stream()
              .map(WriteStatus::getStat)
   +          .filter(Objects::nonNull)
              .collect(Collectors.toList());
   ```
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>πŸ“ Committable suggestion</summary>
   
   > ‼️ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
         List<HoodieWriteStat> writeStats = allWriteStatus.stream()
             .map(WriteStatus::getStat)
             .filter(Objects::nonNull)
             .collect(Collectors.toList());
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>πŸ€– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java`
   around lines 101 - 103, The code collects writeStats from allWriteStatus 
without
   filtering nulls from WriteStatus.getStat(), which can later break validator
   aggregation; update the stream that builds writeStats (starting from the
   allWriteStatus variable) to exclude null stats before collecting (e.g., 
filter
   entries where WriteStatus::getStat != null) so writeStats contains only 
non-null
   HoodieWriteStat instances used by buildCommitMetadata and downstream 
validators
   like those in SparkStreamerValidatorUtils.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:hawk:23f04bdd-2640-4b2c-a814-93717120a9ed -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   β€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/15#discussion_r3036299756)) 
(source:comment#3036299756)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Cache the RDD to avoid recomputation when collecting write stats 
(prevents a second DAG evaluation).
+    // Always unpersist in finally to prevent executor memory leaks.
+    writeStatusRDD.cache();
+    try {
+      List<WriteStatus> allWriteStatus = writeStatusRDD.collect();
+      HoodieCommitMetadata currentMetadata = 
buildCommitMetadata(allWriteStatus, checkpointCommitMetadata);
+      List<HoodieWriteStat> writeStats = allWriteStatus.stream()
+          .map(WriteStatus::getStat)
+          .collect(Collectors.toList());
+
+      // Load previous commit metadata from timeline
+      Option<HoodieCommitMetadata> previousCommitMetadata = 
loadPreviousCommitMetadata(metaClient);
+
+      ValidationContext context = new SparkValidationContext(
+          instant,
+          Option.of(currentMetadata),
+          Option.of(writeStats),
+          previousCommitMetadata,
+          metaClient);
+
+      // Instantiate and run each validator
+      List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+          .map(String::trim)
+          .filter(s -> !s.isEmpty())
+          .collect(Collectors.toList());
+
+      for (String className : classNames) {
+        try {
+          BasePreCommitValidator validator = (BasePreCommitValidator)
+              ReflectionUtils.loadClass(className, new Class<?>[] 
{TypedProperties.class}, props);
+          LOG.info("Running pre-commit validator: {} for instant: {}", 
className, instant);
+          validator.validateWithMetadata(context);
+          LOG.info("Pre-commit validator {} passed for instant: {}", 
className, instant);
+        } catch (HoodieValidationException e) {
+          LOG.error("Pre-commit validator {} failed for instant: {}", 
className, instant, e);
+          throw e;
+        } catch (Exception e) {
+          LOG.error("Failed to instantiate or run validator: {}", className, 
e);
+          throw new HoodieValidationException(
+              "Failed to run pre-commit validator: " + className, e);
+        }
+      }
+    } finally {
+      writeStatusRDD.unpersist();
+    }
+  }
+
+  /**
+   * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write 
statuses and extra metadata.
+   *
+   * <p>This is intentionally a partial/preview object used only for 
validation β€” it contains
+   * write stats and checkpoint extra-metadata, but omits fields that are not 
available before the
+   * commit (e.g. schema, operation type). Validators should treat this as a 
read-only snapshot
+   * of what will be committed, not a fully-constructed commit record.</p>
+   */
+  private static HoodieCommitMetadata buildCommitMetadata(
+      List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+    // Add write stats
+    for (WriteStatus status : writeStatuses) {
+      HoodieWriteStat stat = status.getStat();
+      if (stat != null) {
+        metadata.addWriteStat(stat.getPartitionPath(), stat);
+      }
+    }
+
+    // Add extra metadata (includes checkpoint info like 
deltastreamer.checkpoint.key)
+    if (extraMetadata != null) {
+      extraMetadata.forEach(metadata::addMetadata);
+    }
+
+    return metadata;
+  }
+
+  /**
+   * Load the previous completed commit metadata from the timeline.
+   */
+  private static Option<HoodieCommitMetadata> 
loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline()
+          .getWriteTimeline()
+          .filterCompletedInstants();
+      Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
+      if (lastInstant.isPresent()) {
+        return 
Option.of(completedTimeline.readCommitMetadata(lastInstant.get()));
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to load previous commit metadata", 
e);
+    } catch (Exception e) {
+      LOG.warn("Failed to load previous commit metadata, skipping previous 
commit comparison", e);
+    }

Review Comment:
   _⚠️ Potential issue_ | _🟠 Major_
   
   **Avoid fail-open behavior when previous commit metadata loading throws 
unexpected exceptions.**
   
   This block currently degrades to warn-and-skip comparison on any 
non-`IOException` exception. Since `StreamSync` invokes this pre-commit as a 
hard guard before timeline finalization, unexpected metadata-load failures 
should fail validation instead of silently bypassing it.
   
   
   <details>
   <summary>πŸ›‘οΈ Suggested fix</summary>
   
   ```diff
        } catch (IOException e) {
          throw new HoodieIOException("Failed to load previous commit 
metadata", e);
        } catch (Exception e) {
   -      LOG.warn("Failed to load previous commit metadata, skipping previous 
commit comparison", e);
   +      throw new HoodieValidationException(
   +          "Failed to load previous commit metadata for pre-commit 
validation", e);
        }
        return Option.empty();
      }
   ```
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>πŸ“ Committable suggestion</summary>
   
   > ‼️ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
       } catch (Exception e) {
         throw new HoodieValidationException(
             "Failed to load previous commit metadata for pre-commit 
validation", e);
       }
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>πŸ€– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java`
   around lines 184 - 186, The current catch in SparkStreamerValidatorUtils 
(around
   the previous-commit metadata load in the validator pre-commit) swallows all
   Exceptions and only logs a warning; change the error handling to only catch
   IOExceptions and proceed with the warn-and-skip behavior for those, but let 
any
   other unexpected Exception propagate (rethrow) so validation fails hard.
   Concretely, replace the broad catch(Exception e) used in the previous-commit
   metadata load with a catch(IOException ioe) { LOG.warn(...) } and add a 
separate
   catch(Exception e) that rethrows (or wraps and throws) to ensure StreamSync's
   pre-commit does not silently bypass on unexpected errors.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:hawk:23f04bdd-2640-4b2c-a814-93717120a9ed -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   β€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/15#discussion_r3036299757)) 
(source:comment#3036299757)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Cache the RDD to avoid recomputation when collecting write stats 
(prevents a second DAG evaluation).
+    // Always unpersist in finally to prevent executor memory leaks.
+    writeStatusRDD.cache();
+    try {
+      List<WriteStatus> allWriteStatus = writeStatusRDD.collect();
+      HoodieCommitMetadata currentMetadata = 
buildCommitMetadata(allWriteStatus, checkpointCommitMetadata);
+      List<HoodieWriteStat> writeStats = allWriteStatus.stream()
+          .map(WriteStatus::getStat)
+          .collect(Collectors.toList());
+
+      // Load previous commit metadata from timeline
+      Option<HoodieCommitMetadata> previousCommitMetadata = 
loadPreviousCommitMetadata(metaClient);
+
+      ValidationContext context = new SparkValidationContext(
+          instant,
+          Option.of(currentMetadata),
+          Option.of(writeStats),
+          previousCommitMetadata,
+          metaClient);
+
+      // Instantiate and run each validator
+      List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+          .map(String::trim)
+          .filter(s -> !s.isEmpty())
+          .collect(Collectors.toList());
+
+      for (String className : classNames) {
+        try {
+          BasePreCommitValidator validator = (BasePreCommitValidator)
+              ReflectionUtils.loadClass(className, new Class<?>[] 
{TypedProperties.class}, props);
+          LOG.info("Running pre-commit validator: {} for instant: {}", 
className, instant);
+          validator.validateWithMetadata(context);
+          LOG.info("Pre-commit validator {} passed for instant: {}", 
className, instant);
+        } catch (HoodieValidationException e) {
+          LOG.error("Pre-commit validator {} failed for instant: {}", 
className, instant, e);
+          throw e;
+        } catch (Exception e) {
+          LOG.error("Failed to instantiate or run validator: {}", className, 
e);
+          throw new HoodieValidationException(
+              "Failed to run pre-commit validator: " + className, e);
+        }
+      }
+    } finally {
+      writeStatusRDD.unpersist();
+    }
+  }
+
+  /**
+   * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write 
statuses and extra metadata.
+   *
+   * <p>This is intentionally a partial/preview object used only for 
validation β€” it contains
+   * write stats and checkpoint extra-metadata, but omits fields that are not 
available before the
+   * commit (e.g. schema, operation type). Validators should treat this as a 
read-only snapshot
+   * of what will be committed, not a fully-constructed commit record.</p>
+   */
+  private static HoodieCommitMetadata buildCommitMetadata(
+      List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+    // Add write stats
+    for (WriteStatus status : writeStatuses) {
+      HoodieWriteStat stat = status.getStat();
+      if (stat != null) {
+        metadata.addWriteStat(stat.getPartitionPath(), stat);
+      }
+    }
+
+    // Add extra metadata (includes checkpoint info like 
deltastreamer.checkpoint.key)
+    if (extraMetadata != null) {
+      extraMetadata.forEach(metadata::addMetadata);
+    }
+
+    return metadata;
+  }
+
+  /**
+   * Load the previous completed commit metadata from the timeline.
+   */
+  private static Option<HoodieCommitMetadata> 
loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline()
+          .getWriteTimeline()
+          .filterCompletedInstants();
+      Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
+      if (lastInstant.isPresent()) {
+        return 
Option.of(completedTimeline.readCommitMetadata(lastInstant.get()));
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to load previous commit metadata", 
e);
+    } catch (Exception e) {
+      LOG.warn("Failed to load previous commit metadata, skipping previous 
commit comparison", e);
+    }
+    return Option.empty();

Review Comment:
   <a href="#"><img alt="P1" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p1.svg?v=7"; 
align="top"></a> **Non-`IOException` exceptions silently swallowed, causing 
silent first-commit fallback**
   
   `IOException` is correctly re-thrown as `HoodieIOException`, but any other 
exception (e.g., a `RuntimeException` from corrupted metadata, a 
`ClassCastException` from deserialization, etc.) is only logged at WARN level 
and the method returns `Option.empty()`. This means a genuine error during 
metadata loading makes the validator treat the commit as the first one β€” so it 
skips all offset comparisons and silently lets the commit proceed with no 
validation, exactly the opposite of the intended safety net.
   
   The catch-all for `Exception` should either be removed (letting the 
exception propagate and surface the failure) or explicitly listed as a narrower 
set of safe-to-ignore exceptions:
   
   ```java
   } catch (IOException e) {
       throw new HoodieIOException("Failed to load previous commit metadata", 
e);
   }
   // Remove the catch (Exception e) block β€” let unexpected runtime failures 
propagate.
   // Return Option.empty() only in the no-previous-commit case (already 
handled above).
   ```
   
   β€” *Greptile* 
([original](https://github.com/yihua/hudi/pull/15#discussion_r3036295993)) 
(source:comment#3036295993)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Cache the RDD to avoid recomputation when collecting write stats 
(prevents a second DAG evaluation).
+    // Always unpersist in finally to prevent executor memory leaks.
+    writeStatusRDD.cache();
+    try {
+      List<WriteStatus> allWriteStatus = writeStatusRDD.collect();
+      HoodieCommitMetadata currentMetadata = 
buildCommitMetadata(allWriteStatus, checkpointCommitMetadata);
+      List<HoodieWriteStat> writeStats = allWriteStatus.stream()
+          .map(WriteStatus::getStat)
+          .collect(Collectors.toList());
+
+      // Load previous commit metadata from timeline
+      Option<HoodieCommitMetadata> previousCommitMetadata = 
loadPreviousCommitMetadata(metaClient);
+
+      ValidationContext context = new SparkValidationContext(
+          instant,
+          Option.of(currentMetadata),
+          Option.of(writeStats),
+          previousCommitMetadata,
+          metaClient);
+
+      // Instantiate and run each validator
+      List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+          .map(String::trim)
+          .filter(s -> !s.isEmpty())
+          .collect(Collectors.toList());
+
+      for (String className : classNames) {
+        try {
+          BasePreCommitValidator validator = (BasePreCommitValidator)
+              ReflectionUtils.loadClass(className, new Class<?>[] 
{TypedProperties.class}, props);
+          LOG.info("Running pre-commit validator: {} for instant: {}", 
className, instant);
+          validator.validateWithMetadata(context);
+          LOG.info("Pre-commit validator {} passed for instant: {}", 
className, instant);
+        } catch (HoodieValidationException e) {
+          LOG.error("Pre-commit validator {} failed for instant: {}", 
className, instant, e);
+          throw e;
+        } catch (Exception e) {
+          LOG.error("Failed to instantiate or run validator: {}", className, 
e);
+          throw new HoodieValidationException(
+              "Failed to run pre-commit validator: " + className, e);
+        }
+      }
+    } finally {
+      writeStatusRDD.unpersist();
+    }

Review Comment:
   <a href="#"><img alt="P1" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p1.svg?v=7"; 
align="top"></a> **RDD unpersisted before `writeClient.commit()` triggers its 
own evaluation**
   
   The comment on line 95 says the cache is added to "prevent a second DAG 
evaluation," but the `finally` block calls `unpersist()` before 
`writeClient.commit(instantTime, writeStatusRDD, ...)` executes. Hudi's commit 
path invokes the `WriteStatusValidator` against the write statuses collected 
from the same `writeStatusRDD` β€” once the cache is evicted, Spark must 
re-execute the entire write DAG to produce those statuses again, re-triggering 
the Spark write side effects.
   
   The cache-then-immediately-unpersist pattern protects nothing when the 
second evaluation occurs after `runValidators` returns. Either the caller 
should manage the RDD lifecycle, or `runValidators` should accept a 
pre-collected `List<WriteStatus>` instead of a `JavaRDD<WriteStatus>`:
   
   ```java
   // Option A – accept an already-materialized list so the caller keeps
   // the RDD lifecycle entirely outside this utility.
   public static void runValidators(TypedProperties props,
                                    String instant,
                                    List<WriteStatus> allWriteStatus,
                                    Map<String, String> 
checkpointCommitMetadata,
                                    HoodieTableMetaClient metaClient) {
       // no cache/unpersist needed β€” caller already holds the list
       HoodieCommitMetadata currentMetadata = 
buildCommitMetadata(allWriteStatus, checkpointCommitMetadata);
       ...
   }
   ```
   
   And in `StreamSync`, the caller would first collect (once) and pass the list:
   ```java
   List<WriteStatus> allWriteStatuses = writeStatusRDD.collect();
   SparkStreamerValidatorUtils.runValidators(props, instantTime, 
allWriteStatuses, ...);
   writeClient.commit(instantTime, writeStatusRDD, ...);   // RDD still live in 
caller scope
   ```
   
   β€” *Greptile* 
([original](https://github.com/yihua/hudi/pull/15#discussion_r3036295985)) 
(source:comment#3036295985)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+/**
+ * Spark/HoodieStreamer implementation of {@link ValidationContext}.
+ *
+ * <p>Constructed from data available in {@code 
StreamSync.writeToSinkAndDoMetaSync()}
+ * before the commit is finalized. Provides validators with access to commit 
metadata,
+ * write statistics, and previous commit information for streaming offset 
validation.</p>
+ *
+ * <p>Unlike Flink's implementation, Spark can optionally provide active 
timeline access
+ * via {@link HoodieTableMetaClient} for richer validation patterns.</p>
+ */
+public class SparkValidationContext implements ValidationContext {
+
+  private final String instantTime;
+  private final Option<HoodieCommitMetadata> commitMetadata;
+  private final Option<List<HoodieWriteStat>> writeStats;
+  private final Option<HoodieCommitMetadata> previousCommitMetadata;
+  private final HoodieTableMetaClient metaClient;
+
+  /**
+   * Create a Spark validation context with full timeline access.
+   *
+   * @param instantTime Current commit instant time
+   * @param commitMetadata Current commit metadata (with extraMetadata 
including checkpoints)
+   * @param writeStats Write statistics from write operations
+   * @param previousCommitMetadata Metadata from the previous completed commit
+   * @param metaClient Table meta client for timeline access (may be null for 
testing)
+   */
+  public SparkValidationContext(String instantTime,
+                                Option<HoodieCommitMetadata> commitMetadata,
+                                Option<List<HoodieWriteStat>> writeStats,
+                                Option<HoodieCommitMetadata> 
previousCommitMetadata,
+                                HoodieTableMetaClient metaClient) {
+    this.instantTime = instantTime;
+    this.commitMetadata = commitMetadata;
+    this.writeStats = writeStats;
+    this.previousCommitMetadata = previousCommitMetadata;
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Create a Spark validation context without timeline access (for testing).
+   *
+   * @param instantTime Current commit instant time
+   * @param commitMetadata Current commit metadata (with extraMetadata 
including checkpoints)
+   * @param writeStats Write statistics from write operations
+   * @param previousCommitMetadata Metadata from the previous completed commit
+   */
+  public SparkValidationContext(String instantTime,
+                                Option<HoodieCommitMetadata> commitMetadata,
+                                Option<List<HoodieWriteStat>> writeStats,
+                                Option<HoodieCommitMetadata> 
previousCommitMetadata) {
+    this(instantTime, commitMetadata, writeStats, previousCommitMetadata, 
null);
+  }
+
+  @Override
+  public String getInstantTime() {
+    return instantTime;
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> getCommitMetadata() {
+    return commitMetadata;
+  }
+
+  @Override
+  public Option<List<HoodieWriteStat>> getWriteStats() {
+    return writeStats;
+  }
+
+  /**
+   * Get the active timeline. Available when metaClient is provided.
+   *
+   * @throws UnsupportedOperationException if metaClient was not provided
+   */
+  @Override
+  public HoodieActiveTimeline getActiveTimeline() {
+    if (metaClient == null) {
+      throw new UnsupportedOperationException(
+          "Active timeline is not available without HoodieTableMetaClient.");
+    }
+    return metaClient.getActiveTimeline();
+  }
+
+  /**
+   * Not directly supported. Use {@link #isFirstCommit()} or
+   * {@link #getPreviousCommitMetadata()} instead.
+   *
+   * @throws UnsupportedOperationException always
+   */
+  @Override
+  public Option<HoodieInstant> getPreviousCommitInstant() {
+    throw new UnsupportedOperationException(
+        "getPreviousCommitInstant() is not available in HoodieStreamer 
pre-commit validation context. "
+            + "Use isFirstCommit() or getPreviousCommitMetadata() instead.");
+  }

Review Comment:
   <a href="#"><img alt="P2" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"; 
align="top"></a> **`getPreviousCommitInstant()` always throws, but the 
`ValidationContext` default `isFirstCommit()` delegates to it**
   
   The `ValidationContext` interface defines:
   ```java
   default boolean isFirstCommit() {
       return !getPreviousCommitInstant().isPresent();
   }
   ```
   
   `SparkValidationContext` correctly overrides `isFirstCommit()` so this 
specific call is safe. However, `getPreviousCommitInstant()` is part of the 
public contract of `ValidationContext`. Any existing or future validator that 
calls `context.getPreviousCommitInstant()` directly will receive an unexpected 
`UnsupportedOperationException` at runtime, not a compilation error.
   
   A safer design is to return `Option.empty()` (signalling "no previous 
instant available") instead of throwing, which is consistent with what the 
override of `isFirstCommit()` communicates semantically:
   
   ```java
   @Override
   public Option<HoodieInstant> getPreviousCommitInstant() {
       // Previous instant is not tracked in the HoodieStreamer validation 
context;
       // use isFirstCommit() or getPreviousCommitMetadata() instead.
       return Option.empty();
   }
   ```
   
   β€” *Greptile* 
([original](https://github.com/yihua/hudi/pull/15#discussion_r3036296007)) 
(source:comment#3036296007)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java:
##########
@@ -84,6 +84,22 @@ public static void runValidators(HoodieWriteConfig config,
       Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, 
partitionsModified, table, afterState.schema());
 
       Stream<SparkPreCommitValidator> validators = 
Arrays.stream(config.getPreCommitValidators().split(","))
+          .map(String::trim)
+          .filter(s -> !s.isEmpty())
+          .filter(validatorClass -> {
+            try {
+              Class<?> clazz = Class.forName(validatorClass);
+              if (!SparkPreCommitValidator.class.isAssignableFrom(clazz)) {
+                LOG.warn("Skipping validator {} β€” it does not implement 
SparkPreCommitValidator. "
+                    + "If this is a streaming offset validator (e.g. 
SparkKafkaOffsetValidator), "
+                    + "it will be invoked by SparkStreamerValidatorUtils 
instead.", validatorClass);
+                return false;
+              }
+              return true;
+            } catch (ClassNotFoundException e) {
+              throw new HoodieValidationException("Cannot find validator 
class: " + validatorClass, e);
+            }
+          })

Review Comment:
   <a href="#"><img alt="P2" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"; 
align="top"></a> **Double class loading per validator entry**
   
   The filter lambda calls `Class.forName(validatorClass)` to do the 
assignability check, and then the immediately following `.map(...)` calls 
`ReflectionUtils.loadClass(validatorClass, ...)` which loads (and instantiates) 
the class a second time. Every registered validator is therefore looked up in 
the class loader twice.
   
   Consider collecting into an intermediate structure that carries the loaded 
`Class<?>` so the second load is avoided, or do the assignability check inside 
the existing `loadClass` + cast step where the `ClassCastException` would 
already surface the incompatible type.
   
   β€” *Greptile* 
([original](https://github.com/yihua/hudi/pull/15#discussion_r3036296014)) 
(source:comment#3036296014)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to