This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 12bf942c30 [upsert] Ensure consistent creation time to prevent data 
inconsistency across replicas (#16034)
12bf942c30 is described below

commit 12bf942c30d6a7bce5c10b326f0f27adf8cdeb07
Author: tarun11Mavani <35224468+tarun11mav...@users.noreply.github.com>
AuthorDate: Thu Jun 12 01:53:18 2025 +0530

    [upsert] Ensure consistent creation time to prevent data inconsistency 
across replicas (#16034)
---
 .../realtime/RealtimeSegmentDataManager.java       |  4 +--
 .../manager/realtime/RealtimeTableDataManager.java | 41 ++++++++++++++++++++--
 .../upsert/BasePartitionUpsertMetadataManager.java | 22 ++++++++++++
 ...oncurrentMapPartitionUpsertMetadataManager.java |  4 +--
 ...nUpsertMetadataManagerForConsistentDeletes.java |  4 +--
 ...ertMetadataManagerForConsistentDeletesTest.java |  5 ++-
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  5 ++-
 .../spi/index/metadata/SegmentMetadataImpl.java    | 20 +++++++++++
 8 files changed, 94 insertions(+), 11 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 27406557fb..5ee7a41c45 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1257,7 +1257,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
       return false;
     }
-    _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr);
+    _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr, 
_segmentZKMetadata);
     removeSegmentFile();
     return true;
   }
@@ -1309,7 +1309,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     if (descriptor == null) {
       return false;
     }
-    _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr);
+    _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr, 
_segmentZKMetadata);
     return true;
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 004755f47c..277fd5b910 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -71,6 +71,8 @@ import 
org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -691,7 +693,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         segmentName, _tableNameWithType);
 
     if (isUpsertEnabled()) {
-      handleUpsert(immutableSegment);
+      handleUpsert(immutableSegment, zkMetadata);
       return;
     }
 
@@ -730,10 +732,14 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     }
   }
 
-  private void handleUpsert(ImmutableSegment immutableSegment) {
+  private void handleUpsert(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata) {
     String segmentName = immutableSegment.getSegmentName();
     _logger.info("Adding immutable segment: {} with upsert enabled", 
segmentName);
 
+    // Set the ZK creation time so that same creation time can be used to 
break the comparison ties across replicas,
+    // to ensure data consistency of replica.
+    setZkCreationTimeIfAvailable(immutableSegment, zkMetadata);
+
     Integer partitionId = SegmentUtils.getSegmentPartitionId(segmentName, 
_tableNameWithType, _helixManager, null);
     Preconditions.checkNotNull(partitionId, "Failed to get partition id for 
segment: " + segmentName
         + " (upsert-enabled table: " + _tableNameWithType + ")");
@@ -808,6 +814,22 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     registerSegment(segmentName, segmentDataManager);
   }
 
+  /**
+   * Sets the ZK creation time in the segment metadata if available, to ensure 
consistent
+   * creation times across replicas for upsert operations.
+   */
+  private void setZkCreationTimeIfAvailable(ImmutableSegment segment, 
@Nullable SegmentZKMetadata zkMetadata) {
+    if (zkMetadata != null && zkMetadata.getCreationTime() > 0) {
+      SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+      if (segmentMetadata instanceof SegmentMetadataImpl) {
+        SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) 
segmentMetadata;
+        segmentMetadataImpl.setZkCreationTime(zkMetadata.getCreationTime());
+        _logger.info("Set ZK creation time {} for segment: {} in upsert 
table", zkMetadata.getCreationTime(),
+            zkMetadata.getSegmentName());
+      }
+    }
+  }
+
   /**
    * Replaces the CONSUMING segment with a downloaded committed one.
    */
@@ -826,13 +848,26 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   /**
    * Replaces the CONSUMING segment with the one sealed locally.
    */
+  @Deprecated
   public void replaceConsumingSegment(String segmentName)
       throws Exception {
+    replaceConsumingSegment(segmentName, null);
+  }
+
+  /**
+   * Replaces the CONSUMING segment with the one sealed locally.
+   * This overloaded method avoids extra ZK call when the caller already has 
SegmentZKMetadata.
+   */
+  public void replaceConsumingSegment(String segmentName, @Nullable 
SegmentZKMetadata zkMetadata)
+      throws Exception {
     _logger.info("Replacing CONSUMING segment: {} with the one sealed 
locally", segmentName);
     File indexDir = new File(_indexDir, segmentName);
     // Get a new index loading config with latest table config and schema to 
load the segment
     IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler));
+    ImmutableSegment immutableSegment =
+        ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler);
+
+    addSegment(immutableSegment, zkMetadata);
     _logger.info("Replaced CONSUMING segment: {}", segmentName);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 5dc2647935..3fbc30fce9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -56,7 +56,9 @@ import org.apache.pinot.segment.local.utils.WatermarkUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -1146,4 +1148,24 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
     return Collections.emptySet();
   }
+
+  /**
+   * Returns the ZooKeeper creation time for upsert consistency.
+   * This refers to the time set by the controller when creating new consuming 
segment.
+   * This is used to ensure consistent creation time across replicas for upsert
+   * operations.
+   * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set
+   */
+  protected long getAuthoritativeCreationTime(IndexSegment segment) {
+    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+    if (segmentMetadata instanceof SegmentMetadataImpl) {
+      SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) 
segmentMetadata;
+      long zkCreationTime = segmentMetadataImpl.getZkCreationTime();
+      if (zkCreationTime != Long.MIN_VALUE) {
+        return zkCreationTime;
+      }
+    }
+    // Fall back to local creation time if ZK creation time is not set
+    return segmentMetadata.getIndexCreationTime();
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 5552a6c65c..ad5058c703 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -136,8 +136,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
               // current value, but the segment has a larger sequence number 
(the segment is newer than the current
               // segment).
               if (comparisonResult > 0 || (comparisonResult == 0 && 
shouldReplaceOnComparisonTie(segmentName,
-                  currentSegmentName, 
segment.getSegmentMetadata().getIndexCreationTime(),
-                  
currentSegment.getSegmentMetadata().getIndexCreationTime()))) {
+                  currentSegmentName, getAuthoritativeCreationTime(segment),
+                  getAuthoritativeCreationTime(currentSegment)))) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
                 return new RecordLocation(segment, newDocId, 
newComparisonValue);
               } else {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index a179a05fa7..9af3ec6c23 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -166,8 +166,8 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
               // current value, but the segment has a larger sequence number 
(the segment is newer than the current
               // segment).
               if (comparisonResult > 0 || (comparisonResult == 0 && 
shouldReplaceOnComparisonTie(segmentName,
-                  currentSegmentName, 
segment.getSegmentMetadata().getIndexCreationTime(),
-                  
currentSegment.getSegmentMetadata().getIndexCreationTime()))) {
+                  currentSegmentName, getAuthoritativeCreationTime(segment),
+                  getAuthoritativeCreationTime(currentSegment)))) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
                 return new RecordLocation(segment, newDocId, 
newComparisonValue,
                     
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
index 31ea92cf89..6ebfd90b05 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
@@ -105,7 +105,9 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
         invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
     when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
-    
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+    long creationTimeMs = System.currentTimeMillis();
+    when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+    when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
     if (primaryKeys != null) {
       when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size());
     }
@@ -133,6 +135,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+    when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
     when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size());
     when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
     return segment;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index a059dcd27c..d86e1e4609 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -821,7 +821,9 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
     when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
-    
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+    long creationTimeMs = System.currentTimeMillis();
+    when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+    when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
     when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
     return segment;
   }
@@ -846,6 +848,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+    when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
     when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
     return segment;
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index be5800769e..4313973fd9 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -78,6 +78,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   private final Schema _schema;
   private long _crc = Long.MIN_VALUE;
   private long _creationTime = Long.MIN_VALUE;
+  private long _zkCreationTime = Long.MIN_VALUE;  // ZooKeeper creation time 
for upsert consistency
   private String _timeColumn;
   private TimeUnit _timeUnit;
   private Duration _timeGranularity;
@@ -149,6 +150,7 @@ public class SegmentMetadataImpl implements SegmentMetadata 
{
     _segmentName = segmentName;
     _schema = schema;
     _creationTime = creationTime;
+    _zkCreationTime = creationTime;
   }
 
   /**
@@ -380,6 +382,24 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
     return _creationTime;
   }
 
+  /**
+   * Returns the ZooKeeper creation time for upsert consistency.
+   * This refers to the time set by controller while creating the consuming 
segment. It is used to ensure consistent
+   * creation time across replicas for upsert operations.
+   * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set
+   */
+  public long getZkCreationTime() {
+    return _zkCreationTime;
+  }
+
+  /**
+   * Sets the ZooKeeper creation time for upsert consistency.
+   * @param zkCreationTime ZK creation time in milliseconds
+   */
+  public void setZkCreationTime(long zkCreationTime) {
+    _zkCreationTime = zkCreationTime;
+  }
+
   @Override
   public long getLastIndexedTimestamp() {
     return Long.MIN_VALUE;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to