yihua commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3035005762


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Cache the RDD to avoid recomputation when collecting write stats 
(prevents a second DAG evaluation)
+    writeStatusRDD.cache();
+    List<WriteStatus> allWriteStatus = writeStatusRDD.collect();
+    HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus, 
checkpointCommitMetadata);
+    List<HoodieWriteStat> writeStats = allWriteStatus.stream()
+        .map(WriteStatus::getStat)
+        .collect(Collectors.toList());
+
+    // Load previous commit metadata from timeline
+    Option<HoodieCommitMetadata> previousCommitMetadata = 
loadPreviousCommitMetadata(metaClient);
+
+    ValidationContext context = new SparkValidationContext(
+        instant,
+        Option.of(currentMetadata),
+        Option.of(writeStats),
+        previousCommitMetadata,
+        metaClient);
+
+    // Instantiate and run each validator
+    List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+        .map(String::trim)
+        .filter(s -> !s.isEmpty())
+        .collect(Collectors.toList());
+
+    for (String className : classNames) {
+      try {
+        BasePreCommitValidator validator = (BasePreCommitValidator)
+            ReflectionUtils.loadClass(className, new Class<?>[] 
{TypedProperties.class}, props);
+        LOG.info("Running pre-commit validator: {} for instant: {}", 
className, instant);
+        validator.validateWithMetadata(context);
+        LOG.info("Pre-commit validator {} passed for instant: {}", className, 
instant);
+      } catch (HoodieValidationException e) {
+        LOG.error("Pre-commit validator {} failed for instant: {}", className, 
instant, e);
+        throw e;
+      } catch (Exception e) {
+        LOG.error("Failed to instantiate or run validator: {}", className, e);
+        throw new HoodieValidationException(
+            "Failed to run pre-commit validator: " + className, e);
+      }
+    }
+  }
+
+  /**
+   * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write 
statuses and extra metadata.
+   *
+   * <p>This is intentionally a partial/preview object used only for 
validation — it contains
+   * write stats and checkpoint extra-metadata, but omits fields that are not 
available before the
+   * commit (e.g. schema, operation type). Validators should treat this as a 
read-only snapshot
+   * of what will be committed, not a fully-constructed commit record.</p>
+   */
+  private static HoodieCommitMetadata buildCommitMetadata(
+      List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+    // Add write stats
+    for (WriteStatus status : writeStatuses) {
+      HoodieWriteStat stat = status.getStat();
+      if (stat != null) {
+        metadata.addWriteStat(stat.getPartitionPath(), stat);
+      }
+    }
+
+    // Add extra metadata (includes checkpoint info like 
deltastreamer.checkpoint.key)
+    if (extraMetadata != null) {
+      extraMetadata.forEach(metadata::addMetadata);
+    }
+
+    return metadata;
+  }
+
+  /**

Review Comment:
   🤖 This catches all exceptions and returns `Option.empty()`, which means a 
transient storage failure would silently skip validation — the very safety net 
meant to catch data loss. Have you considered letting non-transient exceptions 
(e.g. `IOException`) propagate so the validator doesn't silently pass when the 
timeline is unreadable?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Cache the RDD to avoid recomputation when collecting write stats 
(prevents a second DAG evaluation)
+    writeStatusRDD.cache();
+    List<WriteStatus> allWriteStatus = writeStatusRDD.collect();

Review Comment:
   🤖 The RDD is cached here but never unpersisted after validation finishes. In 
a long-running HoodieStreamer job, this leaks executor memory each sync cycle. 
Could you add a `try/finally` that calls `writeStatusRDD.unpersist()` once 
validation is done (or at least document that the caller is responsible)?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -872,6 +873,10 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
           totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
+      // Run pre-commit streaming offset validators (if configured) before 
commit
+      SparkStreamerValidatorUtils.runValidators(props, instantTime, 
writeStatusRDD,
+          checkpointCommitMetadata, metaClient);

Review Comment:
   🤖 I think @codope's concern still holds after the author's response. The 
offset validator compares Kafka offset diff against `numInserts + numUpdates` 
from write stats. If some writes fail (e.g. due to transient storage errors), 
those records won't appear in write stats, so the deviation would spike — 
triggering a "data loss" rejection that's actually caused by write errors. This 
is exactly the scenario `commitOnErrors` was designed to tolerate. At minimum, 
the validator could log the error-record count from `WriteStatus` so users can 
distinguish the two failure modes.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java:
##########
@@ -84,6 +84,21 @@ public static void runValidators(HoodieWriteConfig config,
       Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, 
partitionsModified, table, afterState.schema());
 
       Stream<SparkPreCommitValidator> validators = 
Arrays.stream(config.getPreCommitValidators().split(","))
+          .map(String::trim)
+          .filter(validatorClass -> {
+            try {

Review Comment:
   🤖 Unlike `SparkStreamerValidatorUtils` which has `.filter(s -> 
!s.isEmpty())`, this path will call `Class.forName("")` if the config has a 
trailing comma (e.g. `"ValidatorA,"`), turning a benign config typo into a 
`HoodieValidationException`. Could you add an empty-string filter before the 
`Class.forName` call, like the new utility does?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());

Review Comment:
   🤖 Good catch. The `isAssignableFrom` guard prevents the crash, but both 
frameworks still silently filter out validators meant for the other path, which 
could be confusing for users who don't understand the two-path system. A user 
adding `SparkKafkaOffsetValidator` to `hoodie.precommit.validators` in a 
non-HoodieStreamer pipeline would just get a WARN log and the validator would 
quietly do nothing. A separate config key (as you suggested) would make the 
intent self-documenting and avoid this silent-skip trap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to