This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.14.2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.14.2-prep by this 
push:
     new 44fa4c167820 MINOR: Fix clustering row writer to avoid using timestamp 
based reads (#18476)
44fa4c167820 is described below

commit 44fa4c1678201116305825822b2c3ea4221c9641
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Apr 10 09:31:48 2026 +0530

    MINOR: Fix clustering row writer to avoid using timestamp based reads 
(#18476)
    
    PR cherry-picks #18475
    
    This PR adds comprehensive test coverage for clustering operations when 
there's a pending ingestion in a different partition, and fixes an issue with 
row writer clustering that was incorrectly using timestamp-based reads.
    
    The read code path for clustering row writer already filters based on the 
explicit file paths we are setting in params. So, removing the TIMESTAMP_AS_OF 
in the query.
    
    
    ---------
    
    Co-authored-by: Lokesh Jain <[email protected]>
---
 .../MultipleSparkJobExecutionStrategy.java         |   2 -
 .../functional/TestSparkClusteringCornerCases.java | 286 +++++++++++++++++++++
 2 files changed, 286 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 50d8c528594f..28f729304317 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -90,7 +90,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals;
-import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
 import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
 
 /**
@@ -427,7 +426,6 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
 
     HashMap<String, String> params = new HashMap<>();
     params.put("hoodie.datasource.query.type", "snapshot");
-    params.put(TIMESTAMP_AS_OF.key(), instantTime);
 
     Path[] paths;
     if (hasLogFiles) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkClusteringCornerCases.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkClusteringCornerCases.java
new file mode 100644
index 000000000000..b7ccb82bcbd4
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkClusteringCornerCases.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("functional")
+public class TestSparkClusteringCornerCases extends HoodieClientTestBase {
+
+  private static Stream<Arguments> 
testClusteringWithPendingIngestionInDifferentPartition() {
+    // tableType, enableRowWriter
+    return Stream.of(
+        Arguments.of(HoodieTableType.COPY_ON_WRITE, true),
+        Arguments.of(HoodieTableType.COPY_ON_WRITE, false),
+        Arguments.of(HoodieTableType.MERGE_ON_READ, true),
+        Arguments.of(HoodieTableType.MERGE_ON_READ, false)
+    );
+  }
+
+  /**
+   * Test clustering succeeds when there's a pending ingestion in a different 
partition.
+   * This test uses a 2MB max parquet file size and generates enough data to 
create multiple file groups.
+   * Clustering is configured to cluster only 1 file group (max groups = 1, 
max bytes per group = 2MB).
+   *
+   * Scenario:
+   * 1. Ingest data into partition1 with 5000 records (complete) - creates 
multiple file groups
+   * 2. Schedule clustering on partition1 (but don't execute yet) - will 
cluster only 1 file group
+   * 3. Start ingestion into partition2 (but don't complete it - leave it 
inflight)
+   * 4. Execute the clustering that was scheduled earlier (clustering instant 
time < pending ingestion instant)
+   * 5. Clustering should succeed because there's no partition overlap
+   * 6. Complete the pending ingestion in partition2
+   *
+   * The test is parameterized with table type (COW/MOR) and row writer 
enabled/disabled.
+   */
+  @ParameterizedTest
+  @MethodSource
+  void testClusteringWithPendingIngestionInDifferentPartition(HoodieTableType 
tableType, boolean enableRowWriter) throws IOException {
+    // Use specific partition data generators
+    HoodieTestDataGenerator dataGenPartition1 = new HoodieTestDataGenerator(
+        new String[] {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH});
+    HoodieTestDataGenerator dataGenPartition2 = new HoodieTestDataGenerator(
+        new String[] {HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH});
+
+    Properties props = getPropertiesForKeyGen(true);
+
+    // Set max parquet file size to 2MB to create multiple file groups
+    props.setProperty("hoodie.parquet.max.file.size", String.valueOf(2 * 1024 
* 1024));
+
+    // Configure clustering to cluster only 1 file group
+    // Set max groups to 1 and target file size to ensure only 1 file group is 
clustered
+    props.setProperty("hoodie.clustering.plan.strategy.max.num.groups", "1");
+    props.setProperty("hoodie.clustering.plan.strategy.target.file.max.bytes", 
String.valueOf(2 * 1024 * 1024));
+    props.setProperty("hoodie.clustering.plan.strategy.max.bytes.per.group", 
String.valueOf(1 * 1024 * 1024));
+
+    HoodieWriteConfig.Builder configBuilder = getConfigBuilder()
+        .withProperties(props)
+        .withAutoCommit(false);
+
+    // Configure row writer based on parameter
+    if (enableRowWriter) {
+      props.setProperty("hoodie.datasource.write.row.writer.enable", "true");
+    } else {
+      props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
+    }
+    configBuilder.withProperties(props);
+
+    HoodieWriteConfig hoodieWriteConfig = configBuilder.build();
+    initMetaClient(tableType, props);
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      // Step 1: Ingest data into partition1 and complete it
+      // Generate more records to create multiple file groups with 2MB max 
file size
+      String firstInstant = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> recordsPartition1 = 
dataGenPartition1.generateInserts(firstInstant, 5000);
+      writeData(client, firstInstant, recordsPartition1, true);
+
+      // Verify we have 1 completed commit
+      metaClient.reloadActiveTimeline();
+      assertEquals(1, 
metaClient.getActiveTimeline().filterCompletedInstants().countInstants());
+
+      // Verify data was written to exactly one partition (partition1) with 
multiple file groups
+      HoodieInstant hoodieInstant = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+      HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieCommitMetadata.class);
+      assertEquals(1, commitMetadata.getPartitionToWriteStats().size(),
+          "Data should have been written to exactly 1 partition");
+      Assertions.assertTrue(commitMetadata.getPartitionToWriteStats()
+              
.containsKey(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH),
+          "Data should have been written to partition1");
+      long numFileGroupsBeforeClustering = 
commitMetadata.getPartitionToWriteStats()
+          .get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+          .stream()
+          .map(stat -> stat.getFileId())
+          .distinct()
+          .count();
+      Assertions.assertTrue(numFileGroupsBeforeClustering > 1,
+          "Should have created multiple file groups in partition1 with 2MB max 
file size, but got: " + numFileGroupsBeforeClustering);
+
+      // Step 2: Schedule clustering on partition1 (but don't execute yet)
+      String clusteringInstant = HoodieActiveTimeline.createNewInstantTime();
+      client.scheduleClusteringAtInstant(clusteringInstant, Option.empty());
+
+      // Verify clustering is scheduled (pending)
+      metaClient.reloadActiveTimeline();
+      List<HoodieInstant> pendingClusteringInstants = 
metaClient.getActiveTimeline()
+          .filterPendingReplaceTimeline()
+          .getInstants();
+      assertEquals(1, pendingClusteringInstants.size(), "Should have exactly 
one pending clustering instant");
+      assertEquals(clusteringInstant, 
pendingClusteringInstants.get(0).getTimestamp());
+
+      // Step 3: Start ingestion into partition2 but do not complete it
+      // The ingestion instant will be greater than the clustering instant
+      String secondInstant = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> recordsPartition2 = 
dataGenPartition2.generateInserts(secondInstant, 100);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(recordsPartition2, 2);
+
+      client.startCommitWithTime(secondInstant);
+      List<WriteStatus> writeStatuses = client.insert(recordsRDD, 
secondInstant).collect();
+      assertNoWriteErrors(writeStatuses);
+      // Note: NOT calling client.commit() to leave the commit in inflight 
state
+
+      // Verify we have one pending commit (ingestion)
+      metaClient.reloadActiveTimeline();
+      List<HoodieInstant> inflightCommits = metaClient.getActiveTimeline()
+          .filterPendingExcludingCompaction()
+          .getInstants();
+      // Should have 2 pending instants: 1 clustering (replace) + 1 commit 
(ingestion)
+      assertEquals(2, inflightCommits.size(), "Should have 2 pending instants 
(1 clustering + 1 ingestion)");
+
+      // Verify the clustering instant is less than the pending ingestion 
instant
+      Assertions.assertTrue(Long.parseLong(clusteringInstant) < 
Long.parseLong(secondInstant),
+          "Clustering instant should be less than pending ingestion instant");
+
+      // Step 4: Execute the clustering that was scheduled earlier
+      // This should succeed even though there's a pending ingestion in a 
different partition
+      client.cluster(clusteringInstant, true);
+
+      // Verify clustering succeeded
+      metaClient.reloadActiveTimeline();
+      List<HoodieInstant> completedClustering = metaClient.getActiveTimeline()
+          .getCompletedReplaceTimeline()
+          .getInstants();
+      assertEquals(1, completedClustering.size(), "Should have exactly one 
completed clustering commit");
+      assertEquals(clusteringInstant, 
completedClustering.get(0).getTimestamp());
+
+      // Verify the pending ingestion commit is still there
+      metaClient.reloadActiveTimeline();
+      List<HoodieInstant> stillPendingCommits = metaClient.getActiveTimeline()
+          .filterPendingExcludingCompaction()
+          .getInstants();
+      assertEquals(1, stillPendingCommits.size(), "Pending ingestion commit 
should still exist after clustering");
+      assertEquals(secondInstant, stillPendingCommits.get(0).getTimestamp());
+
+      // Step 5: Complete the pending ingestion in partition2
+      client.commit(secondInstant, jsc.parallelize(writeStatuses, 1));
+
+      // Verify both commits are now complete
+      metaClient.reloadActiveTimeline();
+      assertEquals(0, 
metaClient.getActiveTimeline().filterPendingExcludingCompaction().countInstants(),
+          "No pending commits should remain");
+
+      // Verify we have the expected timeline: 2 commits + 1 replace 
(clustering)
+      assertEquals(3, 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(),
+          "Should have 2 completed commit instants");
+      assertEquals(1, 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().countInstants(),
+          "Should have 1 completed clustering instant");
+
+      // Verify clustering metadata shows data was written only for partition1
+      HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+          
metaClient.getActiveTimeline().getInstantDetails(metaClient.getActiveTimeline().getLastClusterCommit().get()).get(),
+          HoodieReplaceCommitMetadata.class
+      );
+      assertEquals(1, replaceCommitMetadata.getPartitionToWriteStats().size(),
+          "Clustering should have written to only 1 partition");
+      Assertions.assertTrue(replaceCommitMetadata.getPartitionToWriteStats()
+              
.containsKey(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH),
+          "Clustering should have written to partition1");
+
+      // Verify clustering only touched 1 file group (as configured by 
max.num.groups=1)
+      long numFileGroupsClustered = 
replaceCommitMetadata.getPartitionToReplaceFileIds()
+          .values()
+          .stream()
+          .mapToLong(List::size)
+          .sum();
+      assertEquals(1, numFileGroupsClustered,
+          "Clustering should have clustered exactly 1 file group (as 
configured), but clustered: " + numFileGroupsClustered);
+
+      // Verify that there were more file groups available than what was 
clustered
+      Assertions.assertTrue(numFileGroupsBeforeClustering > 
numFileGroupsClustered,
+          "Should have had more file groups (" + numFileGroupsBeforeClustering 
+ ") than what was clustered (" + numFileGroupsClustered + ")");
+
+      // Verify exact record count match between the replaced file group and 
clustering output
+      // Get the replaced fileId from the clustering metadata
+      String replacedFileId = 
replaceCommitMetadata.getPartitionToReplaceFileIds()
+          .get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+          .get(0);
+
+      // Find the exact record count for this fileId from the first commit
+      long expectedRecordsInReplacedFileGroup = 
commitMetadata.getPartitionToWriteStats()
+          .get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+          .stream()
+          .filter(stat -> stat.getFileId().equals(replacedFileId))
+          .mapToLong(stat -> stat.getNumInserts() + stat.getNumUpdateWrites())
+          .sum();
+
+      // Get the actual records processed by clustering
+      long recordsProcessedByClustering = 
replaceCommitMetadata.getPartitionToWriteStats()
+          .get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+          .stream()
+          .mapToLong(stat -> stat.getNumInserts() + stat.getNumUpdateWrites())
+          .sum();
+
+      // Verify exact match
+      assertEquals(expectedRecordsInReplacedFileGroup, 
recordsProcessedByClustering,
+          "Clustering should have processed exactly the same number of records 
as in the replaced file group");
+      Assertions.assertTrue(recordsProcessedByClustering > 0,
+          "Clustering should have processed some records, but got: " + 
recordsProcessedByClustering);
+
+      // Verify that there were more total records than what was clustered
+      long totalRecordsInTable = 
commitMetadata.getPartitionToWriteStats().values().stream()
+          .flatMap(List::stream)
+          .mapToLong(stat -> stat.getNumInserts() + stat.getNumUpdateWrites())
+          .sum();
+      Assertions.assertTrue(recordsProcessedByClustering < totalRecordsInTable,
+          "Clustering should have processed fewer records (" + 
recordsProcessedByClustering
+              + ") than total records in the table (" + totalRecordsInTable + 
")");
+    }
+  }
+
+  @Override
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
+  }
+
+  private List<HoodieRecord> writeData(SparkRDDWriteClient client, String 
instant, List<HoodieRecord> recordList, boolean doCommitExplicitly) {
+    JavaRDD records = jsc.parallelize(recordList, 2);
+    client.startCommitWithTime(instant);
+    List<WriteStatus> writeStatuses = client.upsert(records, 
instant).collect();
+    assertNoWriteErrors(writeStatuses);
+    if (doCommitExplicitly) {
+      client.commit(instant, jsc.parallelize(writeStatuses));
+    }
+    return recordList;
+  }
+}
\ No newline at end of file

Reply via email to