yihua commented on code in PR #18405: URL: https://github.com/apache/hudi/pull/18405#discussion_r3036299834
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; + +import java.util.List; + +/** + * Spark/HoodieStreamer implementation of {@link ValidationContext}. + * + * <p>Constructed from data available in {@code StreamSync.writeToSinkAndDoMetaSync()} + * before the commit is finalized. Provides validators with access to commit metadata, + * write statistics, and previous commit information for streaming offset validation.</p> + * + * <p>Unlike Flink's implementation, Spark can optionally provide active timeline access + * via {@link HoodieTableMetaClient} for richer validation patterns.</p> + */ +public class SparkValidationContext implements ValidationContext { + + private final String instantTime; + private final Option<HoodieCommitMetadata> commitMetadata; + private final Option<List<HoodieWriteStat>> writeStats; + private final Option<HoodieCommitMetadata> previousCommitMetadata; + private final HoodieTableMetaClient metaClient; + + /** + * Create a Spark validation context with full timeline access. + * + * @param instantTime Current commit instant time + * @param commitMetadata Current commit metadata (with extraMetadata including checkpoints) + * @param writeStats Write statistics from write operations + * @param previousCommitMetadata Metadata from the previous completed commit + * @param metaClient Table meta client for timeline access (may be null for testing) + */ + public SparkValidationContext(String instantTime, + Option<HoodieCommitMetadata> commitMetadata, + Option<List<HoodieWriteStat>> writeStats, + Option<HoodieCommitMetadata> previousCommitMetadata, + HoodieTableMetaClient metaClient) { + this.instantTime = instantTime; + this.commitMetadata = commitMetadata; + this.writeStats = writeStats; + this.previousCommitMetadata = previousCommitMetadata; + this.metaClient = metaClient; + } + + /** + * Create a Spark validation context without timeline access (for testing). + * + * @param instantTime Current commit instant time + * @param commitMetadata Current commit metadata (with extraMetadata including checkpoints) + * @param writeStats Write statistics from write operations + * @param previousCommitMetadata Metadata from the previous completed commit + */ + public SparkValidationContext(String instantTime, + Option<HoodieCommitMetadata> commitMetadata, + Option<List<HoodieWriteStat>> writeStats, + Option<HoodieCommitMetadata> previousCommitMetadata) { + this(instantTime, commitMetadata, writeStats, previousCommitMetadata, null); + } Review Comment: _β οΈ Potential issue_ | _π Major_ <details> <summary>π§© Analysis chain</summary> π Script executed: ```shell git ls-files | grep -E "(SparkValidationContext|ValidationContext)" | head -20 ``` Repository: yihua/hudi Length of output: 559 --- π Script executed: ```shell find . -name "SparkValidationContext.java" -o -name "ValidationContext.java" ``` Repository: yihua/hudi Length of output: 241 --- π Script executed: ```shell cat -n hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java ``` Repository: yihua/hudi Length of output: 6346 --- π Script executed: ```shell # Search for ValidationContext interface definition find . -name "ValidationContext.java" -type f ``` Repository: yihua/hudi Length of output: 137 --- π Script executed: ```shell # Find all references to getActiveTimeline and getPreviousCommitInstant rg "getActiveTimeline|getPreviousCommitInstant" --type java -B 2 -A 2 ``` Repository: yihua/hudi Length of output: 50367 --- π Script executed: ```shell cat -n hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java ``` Repository: yihua/hudi Length of output: 8437 --- π Script executed: ```shell # Search for usages of ValidationContext in streaming validators rg "ValidationContext" hudi-utilities/src -A 3 -B 1 | head -100 ``` Repository: yihua/hudi Length of output: 12271 --- π Script executed: ```shell # Check if there are other implementations of ValidationContext rg "implements ValidationContext" --type java ``` Repository: yihua/hudi Length of output: 583 --- π Script executed: ```shell # Search for uses of SparkValidationContext in the actual streaming validator find . -name "StreamingOffsetValidator.java" -o -name "*StreamingValidator*" | head -5 ``` Repository: yihua/hudi Length of output: 163 --- π Script executed: ```shell cat -n hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java ``` Repository: yihua/hudi Length of output: 11783 --- π Script executed: ```shell # Check FlinkValidationContext to see if it has similar issues cat -n hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidationContext.java ``` Repository: yihua/hudi Length of output: 5248 --- π Script executed: ```shell # Check TestSparkValidationContext to see what's being tested cat -n hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java ``` Repository: yihua/hudi Length of output: 6625 --- </details> **Clarify API contract boundaries for `getActiveTimeline()` and `getPreviousCommitInstant()`.** Both `getActiveTimeline()` and `getPreviousCommitInstant()` throw `UnsupportedOperationException` in this implementation, following the same pattern as `FlinkValidationContext`. This is by designβvalidators must use overridden convenience methods (`isFirstCommit()`) and the `previousCommitMetadata` field instead. However, the public 4-arg constructor (lines 79β84) claims to be "for testing" but has no `@VisibleForTesting` annotation, allowing production code to instantiate a partially initialized context. Either enforce the testing-only scope via annotations or document clearly that `getActiveTimeline()` will always throw when `metaClient` is absent. Current validators like `StreamingOffsetValidator` correctly avoid these methods, but the interface makes the constraint implicit rather than explicit. <details> <summary>π€ Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java` around lines 79 - 84, The public 4-arg SparkValidationContext constructor currently allows production code to create a context without a MetaClient even though getActiveTimeline() and getPreviousCommitInstant() always throw; update the API contract by marking that constructor as testing-only (annotate the constructor SparkValidationContext(String, Option<HoodieCommitMetadata>, Option<List<HoodieWriteStat>>, Option<HoodieCommitMetadata>) with `@VisibleForTesting`) and add a short JavaDoc on the constructor explaining that getActiveTimeline() and getPreviousCommitInstant() will throw UnsupportedOperationException unless metaClient is provided and validators should use isFirstCommit() and previousCommitMetadata instead; alternatively, make the constructor non-public (private/protected) if you want to prevent external use in productionβapply the chosen change to SparkValidationContext only. ``` </details> <!-- fingerprinting:phantom:medusa:grasshopper:0f652f5b-b6d9-447a-9225-607fc1df25ee --> <!-- This is an auto-generated comment by CodeRabbit --> β *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/15#discussion_r3036299758)) (source:comment#3036299758) ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.validator.BasePreCommitValidator; +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieValidationException; + +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility for running pre-commit validators in the HoodieStreamer commit flow. + * + * <p>Instantiates and executes validators configured via + * {@code hoodie.precommit.validators}. Each validator must extend + * {@link BasePreCommitValidator} and have a constructor that accepts + * {@link TypedProperties}.</p> + * + * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before + * the commit is finalized.</p> + * + * <p><b>Note on validator compatibility:</b> This utility uses a different instantiation + * mechanism than {@code SparkValidatorUtils} (used by the Spark table write path). + * {@code SparkValidatorUtils} expects validators implementing {@code SparkPreCommitValidator} + * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} constructor. + * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend + * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor and + * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under the same + * {@code hoodie.precommit.validators} config if both paths are active.</p> + */ +public class SparkStreamerValidatorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SparkStreamerValidatorUtils.class); + + /** + * Run all configured pre-commit validators. + * + * @param props Configuration properties containing validator class names + * @param instant Commit instant time + * @param writeStatusRDD Write statuses from Spark write operations + * @param checkpointCommitMetadata Extra metadata being committed (contains checkpoint info) + * @param metaClient Table meta client for timeline access and previous commit lookup + * @throws HoodieValidationException if any validator fails with FAIL policy + */ + public static void runValidators(TypedProperties props, + String instant, + JavaRDD<WriteStatus> writeStatusRDD, + Map<String, String> checkpointCommitMetadata, + HoodieTableMetaClient metaClient) { + String validatorClassNames = props.getString( + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()); + + if (StringUtils.isNullOrEmpty(validatorClassNames)) { + return; + } + + // Cache the RDD to avoid recomputation when collecting write stats (prevents a second DAG evaluation). + // Always unpersist in finally to prevent executor memory leaks. + writeStatusRDD.cache(); + try { + List<WriteStatus> allWriteStatus = writeStatusRDD.collect(); + HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus, checkpointCommitMetadata); + List<HoodieWriteStat> writeStats = allWriteStatus.stream() + .map(WriteStatus::getStat) + .collect(Collectors.toList()); Review Comment: _β οΈ Potential issue_ | _π Major_ **Filter null `WriteStatus#getStat()` before building validation context.** `buildCommitMetadata(...)` already guards null stats, but `writeStats` currently does not. A null stat can propagate and fail later during validator aggregation. <details> <summary>π οΈ Suggested fix</summary> ```diff import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; @@ List<HoodieWriteStat> writeStats = allWriteStatus.stream() .map(WriteStatus::getStat) + .filter(Objects::nonNull) .collect(Collectors.toList()); ``` </details> <!-- suggestion_start --> <details> <summary>π Committable suggestion</summary> > βΌοΈ **IMPORTANT** > Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements. ```suggestion List<HoodieWriteStat> writeStats = allWriteStatus.stream() .map(WriteStatus::getStat) .filter(Objects::nonNull) .collect(Collectors.toList()); ``` </details> <!-- suggestion_end --> <details> <summary>π€ Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java` around lines 101 - 103, The code collects writeStats from allWriteStatus without filtering nulls from WriteStatus.getStat(), which can later break validator aggregation; update the stream that builds writeStats (starting from the allWriteStatus variable) to exclude null stats before collecting (e.g., filter entries where WriteStatus::getStat != null) so writeStats contains only non-null HoodieWriteStat instances used by buildCommitMetadata and downstream validators like those in SparkStreamerValidatorUtils. ``` </details> <!-- fingerprinting:phantom:poseidon:hawk:23f04bdd-2640-4b2c-a814-93717120a9ed --> <!-- This is an auto-generated comment by CodeRabbit --> β *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/15#discussion_r3036299756)) (source:comment#3036299756) ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.validator.BasePreCommitValidator; +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieValidationException; + +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility for running pre-commit validators in the HoodieStreamer commit flow. + * + * <p>Instantiates and executes validators configured via + * {@code hoodie.precommit.validators}. Each validator must extend + * {@link BasePreCommitValidator} and have a constructor that accepts + * {@link TypedProperties}.</p> + * + * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before + * the commit is finalized.</p> + * + * <p><b>Note on validator compatibility:</b> This utility uses a different instantiation + * mechanism than {@code SparkValidatorUtils} (used by the Spark table write path). + * {@code SparkValidatorUtils} expects validators implementing {@code SparkPreCommitValidator} + * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} constructor. + * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend + * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor and + * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under the same + * {@code hoodie.precommit.validators} config if both paths are active.</p> + */ +public class SparkStreamerValidatorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SparkStreamerValidatorUtils.class); + + /** + * Run all configured pre-commit validators. + * + * @param props Configuration properties containing validator class names + * @param instant Commit instant time + * @param writeStatusRDD Write statuses from Spark write operations + * @param checkpointCommitMetadata Extra metadata being committed (contains checkpoint info) + * @param metaClient Table meta client for timeline access and previous commit lookup + * @throws HoodieValidationException if any validator fails with FAIL policy + */ + public static void runValidators(TypedProperties props, + String instant, + JavaRDD<WriteStatus> writeStatusRDD, + Map<String, String> checkpointCommitMetadata, + HoodieTableMetaClient metaClient) { + String validatorClassNames = props.getString( + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()); + + if (StringUtils.isNullOrEmpty(validatorClassNames)) { + return; + } + + // Cache the RDD to avoid recomputation when collecting write stats (prevents a second DAG evaluation). + // Always unpersist in finally to prevent executor memory leaks. + writeStatusRDD.cache(); + try { + List<WriteStatus> allWriteStatus = writeStatusRDD.collect(); + HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus, checkpointCommitMetadata); + List<HoodieWriteStat> writeStats = allWriteStatus.stream() + .map(WriteStatus::getStat) + .collect(Collectors.toList()); + + // Load previous commit metadata from timeline + Option<HoodieCommitMetadata> previousCommitMetadata = loadPreviousCommitMetadata(metaClient); + + ValidationContext context = new SparkValidationContext( + instant, + Option.of(currentMetadata), + Option.of(writeStats), + previousCommitMetadata, + metaClient); + + // Instantiate and run each validator + List<String> classNames = Arrays.stream(validatorClassNames.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + for (String className : classNames) { + try { + BasePreCommitValidator validator = (BasePreCommitValidator) + ReflectionUtils.loadClass(className, new Class<?>[] {TypedProperties.class}, props); + LOG.info("Running pre-commit validator: {} for instant: {}", className, instant); + validator.validateWithMetadata(context); + LOG.info("Pre-commit validator {} passed for instant: {}", className, instant); + } catch (HoodieValidationException e) { + LOG.error("Pre-commit validator {} failed for instant: {}", className, instant, e); + throw e; + } catch (Exception e) { + LOG.error("Failed to instantiate or run validator: {}", className, e); + throw new HoodieValidationException( + "Failed to run pre-commit validator: " + className, e); + } + } + } finally { + writeStatusRDD.unpersist(); + } + } + + /** + * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write statuses and extra metadata. + * + * <p>This is intentionally a partial/preview object used only for validation β it contains + * write stats and checkpoint extra-metadata, but omits fields that are not available before the + * commit (e.g. schema, operation type). Validators should treat this as a read-only snapshot + * of what will be committed, not a fully-constructed commit record.</p> + */ + private static HoodieCommitMetadata buildCommitMetadata( + List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + + // Add write stats + for (WriteStatus status : writeStatuses) { + HoodieWriteStat stat = status.getStat(); + if (stat != null) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + } + + // Add extra metadata (includes checkpoint info like deltastreamer.checkpoint.key) + if (extraMetadata != null) { + extraMetadata.forEach(metadata::addMetadata); + } + + return metadata; + } + + /** + * Load the previous completed commit metadata from the timeline. + */ + private static Option<HoodieCommitMetadata> loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) { + try { + HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline() + .getWriteTimeline() + .filterCompletedInstants(); + Option<HoodieInstant> lastInstant = completedTimeline.lastInstant(); + if (lastInstant.isPresent()) { + return Option.of(completedTimeline.readCommitMetadata(lastInstant.get())); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to load previous commit metadata", e); + } catch (Exception e) { + LOG.warn("Failed to load previous commit metadata, skipping previous commit comparison", e); + } Review Comment: _β οΈ Potential issue_ | _π Major_ **Avoid fail-open behavior when previous commit metadata loading throws unexpected exceptions.** This block currently degrades to warn-and-skip comparison on any non-`IOException` exception. Since `StreamSync` invokes this pre-commit as a hard guard before timeline finalization, unexpected metadata-load failures should fail validation instead of silently bypassing it. <details> <summary>π‘οΈ Suggested fix</summary> ```diff } catch (IOException e) { throw new HoodieIOException("Failed to load previous commit metadata", e); } catch (Exception e) { - LOG.warn("Failed to load previous commit metadata, skipping previous commit comparison", e); + throw new HoodieValidationException( + "Failed to load previous commit metadata for pre-commit validation", e); } return Option.empty(); } ``` </details> <!-- suggestion_start --> <details> <summary>π Committable suggestion</summary> > βΌοΈ **IMPORTANT** > Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements. ```suggestion } catch (Exception e) { throw new HoodieValidationException( "Failed to load previous commit metadata for pre-commit validation", e); } ``` </details> <!-- suggestion_end --> <details> <summary>π€ Prompt for AI Agents</summary> ``` Verify each finding against the current code and only fix it if needed. In `@hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java` around lines 184 - 186, The current catch in SparkStreamerValidatorUtils (around the previous-commit metadata load in the validator pre-commit) swallows all Exceptions and only logs a warning; change the error handling to only catch IOExceptions and proceed with the warn-and-skip behavior for those, but let any other unexpected Exception propagate (rethrow) so validation fails hard. Concretely, replace the broad catch(Exception e) used in the previous-commit metadata load with a catch(IOException ioe) { LOG.warn(...) } and add a separate catch(Exception e) that rethrows (or wraps and throws) to ensure StreamSync's pre-commit does not silently bypass on unexpected errors. ``` </details> <!-- fingerprinting:phantom:poseidon:hawk:23f04bdd-2640-4b2c-a814-93717120a9ed --> <!-- This is an auto-generated comment by CodeRabbit --> β *CodeRabbit* ([original](https://github.com/yihua/hudi/pull/15#discussion_r3036299757)) (source:comment#3036299757) ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.validator.BasePreCommitValidator; +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieValidationException; + +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility for running pre-commit validators in the HoodieStreamer commit flow. + * + * <p>Instantiates and executes validators configured via + * {@code hoodie.precommit.validators}. Each validator must extend + * {@link BasePreCommitValidator} and have a constructor that accepts + * {@link TypedProperties}.</p> + * + * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before + * the commit is finalized.</p> + * + * <p><b>Note on validator compatibility:</b> This utility uses a different instantiation + * mechanism than {@code SparkValidatorUtils} (used by the Spark table write path). + * {@code SparkValidatorUtils} expects validators implementing {@code SparkPreCommitValidator} + * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} constructor. + * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend + * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor and + * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under the same + * {@code hoodie.precommit.validators} config if both paths are active.</p> + */ +public class SparkStreamerValidatorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SparkStreamerValidatorUtils.class); + + /** + * Run all configured pre-commit validators. + * + * @param props Configuration properties containing validator class names + * @param instant Commit instant time + * @param writeStatusRDD Write statuses from Spark write operations + * @param checkpointCommitMetadata Extra metadata being committed (contains checkpoint info) + * @param metaClient Table meta client for timeline access and previous commit lookup + * @throws HoodieValidationException if any validator fails with FAIL policy + */ + public static void runValidators(TypedProperties props, + String instant, + JavaRDD<WriteStatus> writeStatusRDD, + Map<String, String> checkpointCommitMetadata, + HoodieTableMetaClient metaClient) { + String validatorClassNames = props.getString( + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()); + + if (StringUtils.isNullOrEmpty(validatorClassNames)) { + return; + } + + // Cache the RDD to avoid recomputation when collecting write stats (prevents a second DAG evaluation). + // Always unpersist in finally to prevent executor memory leaks. + writeStatusRDD.cache(); + try { + List<WriteStatus> allWriteStatus = writeStatusRDD.collect(); + HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus, checkpointCommitMetadata); + List<HoodieWriteStat> writeStats = allWriteStatus.stream() + .map(WriteStatus::getStat) + .collect(Collectors.toList()); + + // Load previous commit metadata from timeline + Option<HoodieCommitMetadata> previousCommitMetadata = loadPreviousCommitMetadata(metaClient); + + ValidationContext context = new SparkValidationContext( + instant, + Option.of(currentMetadata), + Option.of(writeStats), + previousCommitMetadata, + metaClient); + + // Instantiate and run each validator + List<String> classNames = Arrays.stream(validatorClassNames.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + for (String className : classNames) { + try { + BasePreCommitValidator validator = (BasePreCommitValidator) + ReflectionUtils.loadClass(className, new Class<?>[] {TypedProperties.class}, props); + LOG.info("Running pre-commit validator: {} for instant: {}", className, instant); + validator.validateWithMetadata(context); + LOG.info("Pre-commit validator {} passed for instant: {}", className, instant); + } catch (HoodieValidationException e) { + LOG.error("Pre-commit validator {} failed for instant: {}", className, instant, e); + throw e; + } catch (Exception e) { + LOG.error("Failed to instantiate or run validator: {}", className, e); + throw new HoodieValidationException( + "Failed to run pre-commit validator: " + className, e); + } + } + } finally { + writeStatusRDD.unpersist(); + } + } + + /** + * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write statuses and extra metadata. + * + * <p>This is intentionally a partial/preview object used only for validation β it contains + * write stats and checkpoint extra-metadata, but omits fields that are not available before the + * commit (e.g. schema, operation type). Validators should treat this as a read-only snapshot + * of what will be committed, not a fully-constructed commit record.</p> + */ + private static HoodieCommitMetadata buildCommitMetadata( + List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + + // Add write stats + for (WriteStatus status : writeStatuses) { + HoodieWriteStat stat = status.getStat(); + if (stat != null) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + } + + // Add extra metadata (includes checkpoint info like deltastreamer.checkpoint.key) + if (extraMetadata != null) { + extraMetadata.forEach(metadata::addMetadata); + } + + return metadata; + } + + /** + * Load the previous completed commit metadata from the timeline. + */ + private static Option<HoodieCommitMetadata> loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) { + try { + HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline() + .getWriteTimeline() + .filterCompletedInstants(); + Option<HoodieInstant> lastInstant = completedTimeline.lastInstant(); + if (lastInstant.isPresent()) { + return Option.of(completedTimeline.readCommitMetadata(lastInstant.get())); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to load previous commit metadata", e); + } catch (Exception e) { + LOG.warn("Failed to load previous commit metadata, skipping previous commit comparison", e); + } + return Option.empty(); Review Comment: <a href="#"><img alt="P1" src="https://greptile-static-assets.s3.amazonaws.com/badges/p1.svg?v=7" align="top"></a> **Non-`IOException` exceptions silently swallowed, causing silent first-commit fallback** `IOException` is correctly re-thrown as `HoodieIOException`, but any other exception (e.g., a `RuntimeException` from corrupted metadata, a `ClassCastException` from deserialization, etc.) is only logged at WARN level and the method returns `Option.empty()`. This means a genuine error during metadata loading makes the validator treat the commit as the first one β so it skips all offset comparisons and silently lets the commit proceed with no validation, exactly the opposite of the intended safety net. The catch-all for `Exception` should either be removed (letting the exception propagate and surface the failure) or explicitly listed as a narrower set of safe-to-ignore exceptions: ```java } catch (IOException e) { throw new HoodieIOException("Failed to load previous commit metadata", e); } // Remove the catch (Exception e) block β let unexpected runtime failures propagate. // Return Option.empty() only in the no-previous-commit case (already handled above). ``` β *Greptile* ([original](https://github.com/yihua/hudi/pull/15#discussion_r3036295993)) (source:comment#3036295993) ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.validator.BasePreCommitValidator; +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieValidationException; + +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility for running pre-commit validators in the HoodieStreamer commit flow. + * + * <p>Instantiates and executes validators configured via + * {@code hoodie.precommit.validators}. Each validator must extend + * {@link BasePreCommitValidator} and have a constructor that accepts + * {@link TypedProperties}.</p> + * + * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before + * the commit is finalized.</p> + * + * <p><b>Note on validator compatibility:</b> This utility uses a different instantiation + * mechanism than {@code SparkValidatorUtils} (used by the Spark table write path). + * {@code SparkValidatorUtils} expects validators implementing {@code SparkPreCommitValidator} + * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} constructor. + * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend + * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor and + * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under the same + * {@code hoodie.precommit.validators} config if both paths are active.</p> + */ +public class SparkStreamerValidatorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SparkStreamerValidatorUtils.class); + + /** + * Run all configured pre-commit validators. + * + * @param props Configuration properties containing validator class names + * @param instant Commit instant time + * @param writeStatusRDD Write statuses from Spark write operations + * @param checkpointCommitMetadata Extra metadata being committed (contains checkpoint info) + * @param metaClient Table meta client for timeline access and previous commit lookup + * @throws HoodieValidationException if any validator fails with FAIL policy + */ + public static void runValidators(TypedProperties props, + String instant, + JavaRDD<WriteStatus> writeStatusRDD, + Map<String, String> checkpointCommitMetadata, + HoodieTableMetaClient metaClient) { + String validatorClassNames = props.getString( + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()); + + if (StringUtils.isNullOrEmpty(validatorClassNames)) { + return; + } + + // Cache the RDD to avoid recomputation when collecting write stats (prevents a second DAG evaluation). + // Always unpersist in finally to prevent executor memory leaks. + writeStatusRDD.cache(); + try { + List<WriteStatus> allWriteStatus = writeStatusRDD.collect(); + HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus, checkpointCommitMetadata); + List<HoodieWriteStat> writeStats = allWriteStatus.stream() + .map(WriteStatus::getStat) + .collect(Collectors.toList()); + + // Load previous commit metadata from timeline + Option<HoodieCommitMetadata> previousCommitMetadata = loadPreviousCommitMetadata(metaClient); + + ValidationContext context = new SparkValidationContext( + instant, + Option.of(currentMetadata), + Option.of(writeStats), + previousCommitMetadata, + metaClient); + + // Instantiate and run each validator + List<String> classNames = Arrays.stream(validatorClassNames.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + for (String className : classNames) { + try { + BasePreCommitValidator validator = (BasePreCommitValidator) + ReflectionUtils.loadClass(className, new Class<?>[] {TypedProperties.class}, props); + LOG.info("Running pre-commit validator: {} for instant: {}", className, instant); + validator.validateWithMetadata(context); + LOG.info("Pre-commit validator {} passed for instant: {}", className, instant); + } catch (HoodieValidationException e) { + LOG.error("Pre-commit validator {} failed for instant: {}", className, instant, e); + throw e; + } catch (Exception e) { + LOG.error("Failed to instantiate or run validator: {}", className, e); + throw new HoodieValidationException( + "Failed to run pre-commit validator: " + className, e); + } + } + } finally { + writeStatusRDD.unpersist(); + } Review Comment: <a href="#"><img alt="P1" src="https://greptile-static-assets.s3.amazonaws.com/badges/p1.svg?v=7" align="top"></a> **RDD unpersisted before `writeClient.commit()` triggers its own evaluation** The comment on line 95 says the cache is added to "prevent a second DAG evaluation," but the `finally` block calls `unpersist()` before `writeClient.commit(instantTime, writeStatusRDD, ...)` executes. Hudi's commit path invokes the `WriteStatusValidator` against the write statuses collected from the same `writeStatusRDD` β once the cache is evicted, Spark must re-execute the entire write DAG to produce those statuses again, re-triggering the Spark write side effects. The cache-then-immediately-unpersist pattern protects nothing when the second evaluation occurs after `runValidators` returns. Either the caller should manage the RDD lifecycle, or `runValidators` should accept a pre-collected `List<WriteStatus>` instead of a `JavaRDD<WriteStatus>`: ```java // Option A β accept an already-materialized list so the caller keeps // the RDD lifecycle entirely outside this utility. public static void runValidators(TypedProperties props, String instant, List<WriteStatus> allWriteStatus, Map<String, String> checkpointCommitMetadata, HoodieTableMetaClient metaClient) { // no cache/unpersist needed β caller already holds the list HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus, checkpointCommitMetadata); ... } ``` And in `StreamSync`, the caller would first collect (once) and pass the list: ```java List<WriteStatus> allWriteStatuses = writeStatusRDD.collect(); SparkStreamerValidatorUtils.runValidators(props, instantTime, allWriteStatuses, ...); writeClient.commit(instantTime, writeStatusRDD, ...); // RDD still live in caller scope ``` β *Greptile* ([original](https://github.com/yihua/hudi/pull/15#discussion_r3036295985)) (source:comment#3036295985) ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; + +import java.util.List; + +/** + * Spark/HoodieStreamer implementation of {@link ValidationContext}. + * + * <p>Constructed from data available in {@code StreamSync.writeToSinkAndDoMetaSync()} + * before the commit is finalized. Provides validators with access to commit metadata, + * write statistics, and previous commit information for streaming offset validation.</p> + * + * <p>Unlike Flink's implementation, Spark can optionally provide active timeline access + * via {@link HoodieTableMetaClient} for richer validation patterns.</p> + */ +public class SparkValidationContext implements ValidationContext { + + private final String instantTime; + private final Option<HoodieCommitMetadata> commitMetadata; + private final Option<List<HoodieWriteStat>> writeStats; + private final Option<HoodieCommitMetadata> previousCommitMetadata; + private final HoodieTableMetaClient metaClient; + + /** + * Create a Spark validation context with full timeline access. + * + * @param instantTime Current commit instant time + * @param commitMetadata Current commit metadata (with extraMetadata including checkpoints) + * @param writeStats Write statistics from write operations + * @param previousCommitMetadata Metadata from the previous completed commit + * @param metaClient Table meta client for timeline access (may be null for testing) + */ + public SparkValidationContext(String instantTime, + Option<HoodieCommitMetadata> commitMetadata, + Option<List<HoodieWriteStat>> writeStats, + Option<HoodieCommitMetadata> previousCommitMetadata, + HoodieTableMetaClient metaClient) { + this.instantTime = instantTime; + this.commitMetadata = commitMetadata; + this.writeStats = writeStats; + this.previousCommitMetadata = previousCommitMetadata; + this.metaClient = metaClient; + } + + /** + * Create a Spark validation context without timeline access (for testing). + * + * @param instantTime Current commit instant time + * @param commitMetadata Current commit metadata (with extraMetadata including checkpoints) + * @param writeStats Write statistics from write operations + * @param previousCommitMetadata Metadata from the previous completed commit + */ + public SparkValidationContext(String instantTime, + Option<HoodieCommitMetadata> commitMetadata, + Option<List<HoodieWriteStat>> writeStats, + Option<HoodieCommitMetadata> previousCommitMetadata) { + this(instantTime, commitMetadata, writeStats, previousCommitMetadata, null); + } + + @Override + public String getInstantTime() { + return instantTime; + } + + @Override + public Option<HoodieCommitMetadata> getCommitMetadata() { + return commitMetadata; + } + + @Override + public Option<List<HoodieWriteStat>> getWriteStats() { + return writeStats; + } + + /** + * Get the active timeline. Available when metaClient is provided. + * + * @throws UnsupportedOperationException if metaClient was not provided + */ + @Override + public HoodieActiveTimeline getActiveTimeline() { + if (metaClient == null) { + throw new UnsupportedOperationException( + "Active timeline is not available without HoodieTableMetaClient."); + } + return metaClient.getActiveTimeline(); + } + + /** + * Not directly supported. Use {@link #isFirstCommit()} or + * {@link #getPreviousCommitMetadata()} instead. + * + * @throws UnsupportedOperationException always + */ + @Override + public Option<HoodieInstant> getPreviousCommitInstant() { + throw new UnsupportedOperationException( + "getPreviousCommitInstant() is not available in HoodieStreamer pre-commit validation context. " + + "Use isFirstCommit() or getPreviousCommitMetadata() instead."); + } Review Comment: <a href="#"><img alt="P2" src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7" align="top"></a> **`getPreviousCommitInstant()` always throws, but the `ValidationContext` default `isFirstCommit()` delegates to it** The `ValidationContext` interface defines: ```java default boolean isFirstCommit() { return !getPreviousCommitInstant().isPresent(); } ``` `SparkValidationContext` correctly overrides `isFirstCommit()` so this specific call is safe. However, `getPreviousCommitInstant()` is part of the public contract of `ValidationContext`. Any existing or future validator that calls `context.getPreviousCommitInstant()` directly will receive an unexpected `UnsupportedOperationException` at runtime, not a compilation error. A safer design is to return `Option.empty()` (signalling "no previous instant available") instead of throwing, which is consistent with what the override of `isFirstCommit()` communicates semantically: ```java @Override public Option<HoodieInstant> getPreviousCommitInstant() { // Previous instant is not tracked in the HoodieStreamer validation context; // use isFirstCommit() or getPreviousCommitMetadata() instead. return Option.empty(); } ``` β *Greptile* ([original](https://github.com/yihua/hudi/pull/15#discussion_r3036296007)) (source:comment#3036296007) ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java: ########## @@ -84,6 +84,22 @@ public static void runValidators(HoodieWriteConfig config, Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, partitionsModified, table, afterState.schema()); Stream<SparkPreCommitValidator> validators = Arrays.stream(config.getPreCommitValidators().split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .filter(validatorClass -> { + try { + Class<?> clazz = Class.forName(validatorClass); + if (!SparkPreCommitValidator.class.isAssignableFrom(clazz)) { + LOG.warn("Skipping validator {} β it does not implement SparkPreCommitValidator. " + + "If this is a streaming offset validator (e.g. SparkKafkaOffsetValidator), " + + "it will be invoked by SparkStreamerValidatorUtils instead.", validatorClass); + return false; + } + return true; + } catch (ClassNotFoundException e) { + throw new HoodieValidationException("Cannot find validator class: " + validatorClass, e); + } + }) Review Comment: <a href="#"><img alt="P2" src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7" align="top"></a> **Double class loading per validator entry** The filter lambda calls `Class.forName(validatorClass)` to do the assignability check, and then the immediately following `.map(...)` calls `ReflectionUtils.loadClass(validatorClass, ...)` which loads (and instantiates) the class a second time. Every registered validator is therefore looked up in the class loader twice. Consider collecting into an intermediate structure that carries the loaded `Class<?>` so the second load is avoided, or do the assignability check inside the existing `loadClass` + cast step where the `ClassCastException` would already surface the incompatible type. β *Greptile* ([original](https://github.com/yihua/hudi/pull/15#discussion_r3036296014)) (source:comment#3036296014) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
