yihua commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3036291188


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+/**
+ * Spark/HoodieStreamer implementation of {@link ValidationContext}.
+ *
+ * <p>Constructed from data available in {@code 
StreamSync.writeToSinkAndDoMetaSync()}
+ * before the commit is finalized. Provides validators with access to commit 
metadata,
+ * write statistics, and previous commit information for streaming offset 
validation.</p>
+ *
+ * <p>Unlike Flink's implementation, Spark can optionally provide active 
timeline access
+ * via {@link HoodieTableMetaClient} for richer validation patterns.</p>
+ */
+public class SparkValidationContext implements ValidationContext {
+
+  private final String instantTime;
+  private final Option<HoodieCommitMetadata> commitMetadata;
+  private final Option<List<HoodieWriteStat>> writeStats;
+  private final Option<HoodieCommitMetadata> previousCommitMetadata;
+  private final HoodieTableMetaClient metaClient;
+
+  /**
+   * Create a Spark validation context with full timeline access.
+   *
+   * @param instantTime Current commit instant time
+   * @param commitMetadata Current commit metadata (with extraMetadata 
including checkpoints)
+   * @param writeStats Write statistics from write operations
+   * @param previousCommitMetadata Metadata from the previous completed commit
+   * @param metaClient Table meta client for timeline access (may be null for 
testing)
+   */
+  public SparkValidationContext(String instantTime,
+                                Option<HoodieCommitMetadata> commitMetadata,
+                                Option<List<HoodieWriteStat>> writeStats,
+                                Option<HoodieCommitMetadata> 
previousCommitMetadata,
+                                HoodieTableMetaClient metaClient) {
+    this.instantTime = instantTime;
+    this.commitMetadata = commitMetadata;
+    this.writeStats = writeStats;
+    this.previousCommitMetadata = previousCommitMetadata;
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Create a Spark validation context without timeline access (for testing).
+   *
+   * @param instantTime Current commit instant time
+   * @param commitMetadata Current commit metadata (with extraMetadata 
including checkpoints)
+   * @param writeStats Write statistics from write operations
+   * @param previousCommitMetadata Metadata from the previous completed commit
+   */
+  public SparkValidationContext(String instantTime,
+                                Option<HoodieCommitMetadata> commitMetadata,
+                                Option<List<HoodieWriteStat>> writeStats,
+                                Option<HoodieCommitMetadata> 
previousCommitMetadata) {
+    this(instantTime, commitMetadata, writeStats, previousCommitMetadata, 
null);
+  }
+
+  @Override
+  public String getInstantTime() {
+    return instantTime;
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> getCommitMetadata() {
+    return commitMetadata;
+  }
+
+  @Override
+  public Option<List<HoodieWriteStat>> getWriteStats() {
+    return writeStats;
+  }
+
+  /**
+   * Get the active timeline. Available when metaClient is provided.
+   *
+   * @throws UnsupportedOperationException if metaClient was not provided
+   */
+  @Override
+  public HoodieActiveTimeline getActiveTimeline() {
+    if (metaClient == null) {
+      throw new UnsupportedOperationException(
+          "Active timeline is not available without HoodieTableMetaClient.");
+    }
+    return metaClient.getActiveTimeline();
+  }
+
+  /**
+   * Not directly supported. Use {@link #isFirstCommit()} or
+   * {@link #getPreviousCommitMetadata()} instead.

Review Comment:
   🤖 `getPreviousCommitInstant()` is a `@PublicAPIMethod` on the 
`ValidationContext` interface, and the default `isFirstCommit()` delegates to 
it. Throwing `UnsupportedOperationException` here means any future validator 
that calls `getPreviousCommitInstant()` will blow up unexpectedly when running 
in the Spark/Streamer context. Since you already have `metaClient` available, 
could you implement this properly (e.g., query the active timeline for the last 
completed commit) when `metaClient != null`, and only throw when it's null?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java:
##########
@@ -84,6 +84,22 @@ public static void runValidators(HoodieWriteConfig config,
       Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, 
partitionsModified, table, afterState.schema());
 
       Stream<SparkPreCommitValidator> validators = 
Arrays.stream(config.getPreCommitValidators().split(","))
+          .map(String::trim)
+          .filter(s -> !s.isEmpty())
+          .filter(validatorClass -> {

Review Comment:
   🤖 Is there a concern that listing `SparkKafkaOffsetValidator` in 
`hoodie.precommit.validators` would silently skip it here (via the 
`isAssignableFrom` check) while also silently skip it in the HoodieStreamer 
path if the user isn't running HoodieStreamer? It might be worth adding a more 
prominent log message or even a validation error when a configured validator 
can't run in any active path.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Cache the RDD to avoid recomputation when collecting write stats 
(prevents a second DAG evaluation).

Review Comment:
   🤖 The `finally` block calls `writeStatusRDD.unpersist()` before 
`writeClient.commit()` uses the same RDD in `StreamSync`. After unpersist, the 
commit path will trigger a second DAG evaluation of the write RDD, which could 
re-execute the actual write operations or at minimum double the I/O. Could you 
either move the `cache()`/`unpersist()` lifecycle to the caller in `StreamSync` 
(wrapping both the validator and the commit call), or remove the `unpersist()` 
here and let the commit path handle the RDD lifecycle?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Cache the RDD to avoid recomputation when collecting write stats 
(prevents a second DAG evaluation).
+    // Always unpersist in finally to prevent executor memory leaks.
+    writeStatusRDD.cache();
+    try {
+      List<WriteStatus> allWriteStatus = writeStatusRDD.collect();
+      HoodieCommitMetadata currentMetadata = 
buildCommitMetadata(allWriteStatus, checkpointCommitMetadata);
+      List<HoodieWriteStat> writeStats = allWriteStatus.stream()
+          .map(WriteStatus::getStat)
+          .collect(Collectors.toList());
+
+      // Load previous commit metadata from timeline
+      Option<HoodieCommitMetadata> previousCommitMetadata = 
loadPreviousCommitMetadata(metaClient);
+
+      ValidationContext context = new SparkValidationContext(
+          instant,
+          Option.of(currentMetadata),
+          Option.of(writeStats),
+          previousCommitMetadata,
+          metaClient);
+
+      // Instantiate and run each validator
+      List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+          .map(String::trim)
+          .filter(s -> !s.isEmpty())
+          .collect(Collectors.toList());
+
+      for (String className : classNames) {
+        try {
+          BasePreCommitValidator validator = (BasePreCommitValidator)
+              ReflectionUtils.loadClass(className, new Class<?>[] 
{TypedProperties.class}, props);
+          LOG.info("Running pre-commit validator: {} for instant: {}", 
className, instant);
+          validator.validateWithMetadata(context);
+          LOG.info("Pre-commit validator {} passed for instant: {}", 
className, instant);
+        } catch (HoodieValidationException e) {
+          LOG.error("Pre-commit validator {} failed for instant: {}", 
className, instant, e);
+          throw e;
+        } catch (Exception e) {
+          LOG.error("Failed to instantiate or run validator: {}", className, 
e);
+          throw new HoodieValidationException(
+              "Failed to run pre-commit validator: " + className, e);
+        }
+      }
+    } finally {
+      writeStatusRDD.unpersist();
+    }
+  }
+
+  /**
+   * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write 
statuses and extra metadata.
+   *
+   * <p>This is intentionally a partial/preview object used only for 
validation — it contains
+   * write stats and checkpoint extra-metadata, but omits fields that are not 
available before the
+   * commit (e.g. schema, operation type). Validators should treat this as a 
read-only snapshot
+   * of what will be committed, not a fully-constructed commit record.</p>
+   */
+  private static HoodieCommitMetadata buildCommitMetadata(
+      List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+    // Add write stats
+    for (WriteStatus status : writeStatuses) {
+      HoodieWriteStat stat = status.getStat();
+      if (stat != null) {
+        metadata.addWriteStat(stat.getPartitionPath(), stat);
+      }
+    }
+
+    // Add extra metadata (includes checkpoint info like 
deltastreamer.checkpoint.key)
+    if (extraMetadata != null) {
+      extraMetadata.forEach(metadata::addMetadata);
+    }
+
+    return metadata;
+  }
+
+  /**
+   * Load the previous completed commit metadata from the timeline.
+   */
+  private static Option<HoodieCommitMetadata> 
loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline()
+          .getWriteTimeline()

Review Comment:
   🤖 Catching generic `Exception` and logging a warning here means that non-IO 
failures (e.g., corrupt metadata, deserialization errors, `ClassCastException`) 
silently skip validation instead of failing. This could mask real data 
integrity issues. I'd suggest either removing the generic catch and letting 
only `IOException` be handled, or at least re-evaluating whether silently 
proceeding without previous commit metadata is safe for offset validation.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Collect write statuses and build context
+    List<WriteStatus> allWriteStatus = writeStatusRDD.collect();

Review Comment:
   🤖 The `cache()` before `collect()` is good, but the `unpersist()` in the 
finally block is concerning. In `StreamSync.writeToSinkAndDoMetaSync`, the same 
`writeStatusRDD` is passed to `writeClient.commit()` immediately after 
`runValidators` returns (line 877). The `unpersist()` removes it from cache 
before commit gets to use it, potentially forcing a full DAG recomputation. 
Consider either removing the `unpersist()` (letting the caller manage the RDD 
lifecycle) or having the caller cache the RDD before calling `runValidators`.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());

Review Comment:
   🤖 Good catch. The PR adds a `Class.forName` + `isAssignableFrom` filter in 
`SparkValidatorUtils` to skip non-`SparkPreCommitValidator` classes, which 
addresses the ClassCastException/reflection error. However, note that the 
reverse direction isn't guarded: if someone accidentally puts a 
`SparkPreCommitValidator` class name (e.g. 
`SqlQueryEqualityPreCommitValidator`) into the config while using 
HoodieStreamer, `SparkStreamerValidatorUtils` will try to instantiate it with a 
`(TypedProperties)` constructor and fail. Might be worth adding a symmetric 
check there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to