yihua commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3036286661


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+/**
+ * Spark/HoodieStreamer implementation of {@link ValidationContext}.
+ *
+ * <p>Constructed from data available in {@code 
StreamSync.writeToSinkAndDoMetaSync()}
+ * before the commit is finalized. Provides validators with access to commit 
metadata,
+ * write statistics, and previous commit information for streaming offset 
validation.</p>
+ *
+ * <p>Unlike Flink's implementation, Spark can optionally provide active 
timeline access
+ * via {@link HoodieTableMetaClient} for richer validation patterns.</p>
+ */
+public class SparkValidationContext implements ValidationContext {
+
+  private final String instantTime;
+  private final Option<HoodieCommitMetadata> commitMetadata;
+  private final Option<List<HoodieWriteStat>> writeStats;
+  private final Option<HoodieCommitMetadata> previousCommitMetadata;
+  private final HoodieTableMetaClient metaClient;
+
+  /**

Review Comment:
   🤖 nit: could you wrap `metaClient` in `Option<HoodieTableMetaClient>` 
instead of leaving it nullable? The rest of the class uses `Option` for 
optional values, and the Hudi codebase (e.g., `HiveSyncTool`, 
`SchemaEvolutionContext`) consistently prefers `Option` over null for optional 
`HoodieTableMetaClient` parameters — it would keep the API consistent and let 
the `getActiveTimeline()` guard be `metaClient.isPresent()` instead of a null 
check.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,

Review Comment:
   🤖 nit: could you rename `instant` to `instantTime`? The rest of the codebase 
(e.g. `SparkValidatorUtils`, `StreamSync`, `UtilHelpers`) consistently uses 
`instantTime` for commit instant strings, and `instant` is easy to confuse with 
a `HoodieInstant` object.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link SparkStreamerValidatorUtils}.
+ *
+ * <p>Uses a lightweight Spark context for JavaRDD creation. Tests cover 
orchestration logic
+ * (class loading, config passing, error handling) as well as end-to-end 
offset validation
+ * using a two-commit timeline to verify the real comparison path is 
exercised.</p>
+ */
+public class TestSparkStreamerValidatorUtils {
+
+  private static JavaSparkContext jsc;
+
+  @TempDir
+  Path tempDir;
+
+  @BeforeAll
+  public static void setUp() {
+    SparkConf conf = new SparkConf()
+        .setAppName("TestSparkStreamerValidatorUtils")
+        .setMaster("local[2]")
+        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+        .set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
+    jsc = new JavaSparkContext(conf);
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    if (jsc != null) {
+      jsc.stop();
+      jsc = null;
+    }
+  }
+
+  private static TypedProperties propsWithValidator(String validatorClassName) 
{
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), 
validatorClassName);
+    
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
 "FAIL");
+    return props;
+  }
+
+  private static WriteStatus buildWriteStatus(String partitionPath, long 
numInserts, long numUpdates) {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setPartitionPath(partitionPath);
+    stat.setNumInserts(numInserts);
+    stat.setNumUpdateWrites(numUpdates);
+
+    WriteStatus ws = new WriteStatus(false, 0.0);
+    ws.setStat(stat);
+    return ws;
+  }
+
+  private JavaRDD<WriteStatus> toRDD(List<WriteStatus> writeStatuses) {
+    return jsc.parallelize(writeStatuses);
+  }
+
+  private HoodieTableMetaClient createMetaClient() throws IOException {
+    return org.apache.hudi.common.testutils.HoodieTestUtils.init(
+        tempDir.toAbsolutePath().toString());

Review Comment:
   🤖 nit: `HoodieTestTable` (from the same package) is already imported but 
`HoodieTestUtils` is referenced with its fully-qualified name here — it might 
be worth adding `import org.apache.hudi.common.testutils.HoodieTestUtils;` for 
consistency.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instant Commit instant time
+   * @param writeStatusRDD Write statuses from Spark write operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instant,
+                                   JavaRDD<WriteStatus> writeStatusRDD,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Cache the RDD to avoid recomputation when collecting write stats 
(prevents a second DAG evaluation).
+    // Always unpersist in finally to prevent executor memory leaks.
+    writeStatusRDD.cache();
+    try {

Review Comment:
   🤖 nit: `allWriteStatus` reads like a singular noun rather than a collection 
— `writeStatuses` (matching the `List<WriteStatus>` type and the 
`writeStatuses` naming used elsewhere) might be slightly clearer.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.StreamingOffsetValidator;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+
+/**
+ * Spark/HoodieStreamer-specific Kafka offset validator.
+ *
+ * <p>Validates that the number of records written matches the Kafka offset 
difference
+ * between the current and previous HoodieStreamer checkpoints. Uses the Spark 
Kafka
+ * checkpoint format stored with key {@code deltastreamer.checkpoint.key} in 
extraMetadata.</p>
+ *
+ * <p>Configuration:
+ * <ul>
+ *   <li>{@code hoodie.precommit.validators}: Include
+ *       {@code 
org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator}</li>
+ *   <li>{@code 
hoodie.precommit.validators.streaming.offset.tolerance.percentage}:
+ *       Acceptable deviation (default: 0.0 = strict)</li>
+ *   <li>{@code hoodie.precommit.validators.failure.policy}:
+ *       FAIL (default) or WARN_LOG</li>
+ * </ul></p>
+ *
+ * <p>This validator is primarily intended for append-only ingestion from 
Kafka via HoodieStreamer.
+ * For upsert workloads with deduplication, configure a higher tolerance or 
use WARN_LOG.</p>
+ *
+ * <p><b>Important:</b> This class extends {@link 
org.apache.hudi.client.validator.BasePreCommitValidator}
+ * and is invoked by {@link SparkStreamerValidatorUtils}, NOT by {@code 
SparkValidatorUtils}
+ * (which expects {@code SparkPreCommitValidator} with a different constructor 
signature).
+ * Listing this class in {@code hoodie.precommit.validators} while also using 
the standard
+ * Spark table write-path validators will cause an instantiation failure in 
{@code SparkValidatorUtils}.
+ * Use this validator exclusively with HoodieStreamer pipelines.</p>
+ */
+public class SparkKafkaOffsetValidator extends StreamingOffsetValidator {
+
+  /**
+   * Create a Spark Kafka offset validator.
+   *

Review Comment:
   🤖 nit: the three-line Javadoc on this constructor (`Create a Spark Kafka 
offset validator. / @param config Validator configuration`) restates what the 
class name and type already communicate — it might be worth dropping it and 
letting the class-level Javadoc speak for itself.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -872,6 +873,14 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
           totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
+      // Run pre-commit streaming offset validators (if configured) before 
commit.

Review Comment:
   🤖 nit: the first line of this comment block (`// Run pre-commit streaming 
offset validators (if configured) before commit.`) restates what the code does 
— the four lines below it that explain *why* the ordering matters relative to 
`commitOnErrors` are the valuable part. Dropping the first line would tighten 
it up slightly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to