This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 12bf942c30 [upsert] Ensure consistent creation time to prevent data inconsistency across replicas (#16034) 12bf942c30 is described below commit 12bf942c30d6a7bce5c10b326f0f27adf8cdeb07 Author: tarun11Mavani <35224468+tarun11mav...@users.noreply.github.com> AuthorDate: Thu Jun 12 01:53:18 2025 +0530 [upsert] Ensure consistent creation time to prevent data inconsistency across replicas (#16034) --- .../realtime/RealtimeSegmentDataManager.java | 4 +-- .../manager/realtime/RealtimeTableDataManager.java | 41 ++++++++++++++++++++-- .../upsert/BasePartitionUpsertMetadataManager.java | 22 ++++++++++++ ...oncurrentMapPartitionUpsertMetadataManager.java | 4 +-- ...nUpsertMetadataManagerForConsistentDeletes.java | 4 +-- ...ertMetadataManagerForConsistentDeletesTest.java | 5 ++- ...rrentMapPartitionUpsertMetadataManagerTest.java | 5 ++- .../spi/index/metadata/SegmentMetadataImpl.java | 20 +++++++++++ 8 files changed, 94 insertions(+), 11 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 27406557fb..5ee7a41c45 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1257,7 +1257,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); return false; } - _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr); + _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr, _segmentZKMetadata); removeSegmentFile(); return true; } @@ -1309,7 +1309,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { if (descriptor == null) { return false; } - _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr); + _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr, _segmentZKMetadata); return true; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 004755f47c..277fd5b910 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -71,6 +71,8 @@ import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -691,7 +693,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { segmentName, _tableNameWithType); if (isUpsertEnabled()) { - handleUpsert(immutableSegment); + handleUpsert(immutableSegment, zkMetadata); return; } @@ -730,10 +732,14 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } } - private void handleUpsert(ImmutableSegment immutableSegment) { + private void handleUpsert(ImmutableSegment immutableSegment, @Nullable SegmentZKMetadata zkMetadata) { String segmentName = immutableSegment.getSegmentName(); _logger.info("Adding immutable segment: {} with upsert enabled", segmentName); + // Set the ZK creation time so that same creation time can be used to break the comparison ties across replicas, + // to ensure data consistency of replica. + setZkCreationTimeIfAvailable(immutableSegment, zkMetadata); + Integer partitionId = SegmentUtils.getSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, null); Preconditions.checkNotNull(partitionId, "Failed to get partition id for segment: " + segmentName + " (upsert-enabled table: " + _tableNameWithType + ")"); @@ -808,6 +814,22 @@ public class RealtimeTableDataManager extends BaseTableDataManager { registerSegment(segmentName, segmentDataManager); } + /** + * Sets the ZK creation time in the segment metadata if available, to ensure consistent + * creation times across replicas for upsert operations. + */ + private void setZkCreationTimeIfAvailable(ImmutableSegment segment, @Nullable SegmentZKMetadata zkMetadata) { + if (zkMetadata != null && zkMetadata.getCreationTime() > 0) { + SegmentMetadata segmentMetadata = segment.getSegmentMetadata(); + if (segmentMetadata instanceof SegmentMetadataImpl) { + SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) segmentMetadata; + segmentMetadataImpl.setZkCreationTime(zkMetadata.getCreationTime()); + _logger.info("Set ZK creation time {} for segment: {} in upsert table", zkMetadata.getCreationTime(), + zkMetadata.getSegmentName()); + } + } + } + /** * Replaces the CONSUMING segment with a downloaded committed one. */ @@ -826,13 +848,26 @@ public class RealtimeTableDataManager extends BaseTableDataManager { /** * Replaces the CONSUMING segment with the one sealed locally. */ + @Deprecated public void replaceConsumingSegment(String segmentName) throws Exception { + replaceConsumingSegment(segmentName, null); + } + + /** + * Replaces the CONSUMING segment with the one sealed locally. + * This overloaded method avoids extra ZK call when the caller already has SegmentZKMetadata. + */ + public void replaceConsumingSegment(String segmentName, @Nullable SegmentZKMetadata zkMetadata) + throws Exception { _logger.info("Replacing CONSUMING segment: {} with the one sealed locally", segmentName); File indexDir = new File(_indexDir, segmentName); // Get a new index loading config with latest table config and schema to load the segment IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); - addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler)); + ImmutableSegment immutableSegment = + ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler); + + addSegment(immutableSegment, zkMetadata); _logger.info("Replaced CONSUMING segment: {}", segmentName); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 5dc2647935..3fbc30fce9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -56,7 +56,9 @@ import org.apache.pinot.segment.local.utils.WatermarkUtils; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; @@ -1146,4 +1148,24 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } return Collections.emptySet(); } + + /** + * Returns the ZooKeeper creation time for upsert consistency. + * This refers to the time set by the controller when creating new consuming segment. + * This is used to ensure consistent creation time across replicas for upsert + * operations. + * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set + */ + protected long getAuthoritativeCreationTime(IndexSegment segment) { + SegmentMetadata segmentMetadata = segment.getSegmentMetadata(); + if (segmentMetadata instanceof SegmentMetadataImpl) { + SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) segmentMetadata; + long zkCreationTime = segmentMetadataImpl.getZkCreationTime(); + if (zkCreationTime != Long.MIN_VALUE) { + return zkCreationTime; + } + } + // Fall back to local creation time if ZK creation time is not set + return segmentMetadata.getIndexCreationTime(); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 5552a6c65c..ad5058c703 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -136,8 +136,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp // current value, but the segment has a larger sequence number (the segment is newer than the current // segment). if (comparisonResult > 0 || (comparisonResult == 0 && shouldReplaceOnComparisonTie(segmentName, - currentSegmentName, segment.getSegmentMetadata().getIndexCreationTime(), - currentSegment.getSegmentMetadata().getIndexCreationTime()))) { + currentSegmentName, getAuthoritativeCreationTime(segment), + getAuthoritativeCreationTime(currentSegment)))) { replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } else { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java index a179a05fa7..9af3ec6c23 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java @@ -166,8 +166,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes // current value, but the segment has a larger sequence number (the segment is newer than the current // segment). if (comparisonResult > 0 || (comparisonResult == 0 && shouldReplaceOnComparisonTie(segmentName, - currentSegmentName, segment.getSegmentMetadata().getIndexCreationTime(), - currentSegment.getSegmentMetadata().getIndexCreationTime()))) { + currentSegmentName, getAuthoritativeCreationTime(segment), + getAuthoritativeCreationTime(currentSegment)))) { replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue, RecordLocation.incrementSegmentCount(currentDistinctSegmentCount)); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java index 31ea92cf89..6ebfd90b05 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java @@ -105,7 +105,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest invocation -> primaryKeys.get(invocation.getArgument(0)).getValues()[0]); when(dataSource.getForwardIndex()).thenReturn(forwardIndex); SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); - when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis()); + long creationTimeMs = System.currentTimeMillis(); + when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs); + when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs); if (primaryKeys != null) { when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size()); } @@ -133,6 +135,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest when(dataSource.getForwardIndex()).thenReturn(forwardIndex); SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs); + when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs); when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size()); when(segment.getSegmentMetadata()).thenReturn(segmentMetadata); return segment; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index a059dcd27c..d86e1e4609 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -821,7 +821,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { invocation -> primaryKeys.get(invocation.getArgument(0)).getValues()[0]); when(dataSource.getForwardIndex()).thenReturn(forwardIndex); SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); - when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis()); + long creationTimeMs = System.currentTimeMillis(); + when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs); + when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs); when(segment.getSegmentMetadata()).thenReturn(segmentMetadata); return segment; } @@ -846,6 +848,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { when(dataSource.getForwardIndex()).thenReturn(forwardIndex); SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs); + when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs); when(segment.getSegmentMetadata()).thenReturn(segmentMetadata); return segment; } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java index be5800769e..4313973fd9 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java @@ -78,6 +78,7 @@ public class SegmentMetadataImpl implements SegmentMetadata { private final Schema _schema; private long _crc = Long.MIN_VALUE; private long _creationTime = Long.MIN_VALUE; + private long _zkCreationTime = Long.MIN_VALUE; // ZooKeeper creation time for upsert consistency private String _timeColumn; private TimeUnit _timeUnit; private Duration _timeGranularity; @@ -149,6 +150,7 @@ public class SegmentMetadataImpl implements SegmentMetadata { _segmentName = segmentName; _schema = schema; _creationTime = creationTime; + _zkCreationTime = creationTime; } /** @@ -380,6 +382,24 @@ public class SegmentMetadataImpl implements SegmentMetadata { return _creationTime; } + /** + * Returns the ZooKeeper creation time for upsert consistency. + * This refers to the time set by controller while creating the consuming segment. It is used to ensure consistent + * creation time across replicas for upsert operations. + * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set + */ + public long getZkCreationTime() { + return _zkCreationTime; + } + + /** + * Sets the ZooKeeper creation time for upsert consistency. + * @param zkCreationTime ZK creation time in milliseconds + */ + public void setZkCreationTime(long zkCreationTime) { + _zkCreationTime = zkCreationTime; + } + @Override public long getLastIndexedTimestamp() { return Long.MIN_VALUE; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org