This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d91ad73fec Optimize snapshot flow to only snapshot segments which have updates (#13285) d91ad73fec is described below commit d91ad73fecc2989cb24cdfe3cbaf0ad4da469c12 Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Wed Jun 12 04:01:59 2024 +0530 Optimize snapshot flow to only snapshot segments which have updates (#13285) * Optimize snapshot flow to only snapshot segments which are updated since last snapshot * enable snapshotting before consumption for partial-upsert tables * end snapshotting if error found --- .../realtime/RealtimeSegmentDataManager.java | 19 +++++-- .../upsert/BasePartitionUpsertMetadataManager.java | 58 +++++++++++++++----- .../BasePartitionUpsertMetadataManagerTest.java | 62 +++++++++++++++++++++- .../apache/pinot/spi/config/table/TableConfig.java | 5 ++ 4 files changed, 128 insertions(+), 16 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 8dc7842f81..44cea7155d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -703,9 +703,22 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // persisted. // Take upsert snapshot before starting consuming events if (_partitionUpsertMetadataManager != null) { - _partitionUpsertMetadataManager.takeSnapshot(); - // If upsertTTL is enabled, we will remove expired primary keys from upsertMetadata after taking snapshot. - _partitionUpsertMetadataManager.removeExpiredPrimaryKeys(); + if (_tableConfig.getUpsertMetadataTTL() > 0) { + // If upsertMetadataTTL is enabled, we will remove expired primary keys from upsertMetadata + // AFTER taking a snapshot. Taking the snapshot first is crucial to capture the final + // state of each key before it exits the TTL window. Out-of-TTL segments are skipped in + // the doAddSegment flow, and the snapshot is used to enableUpsert on the immutable out-of-TTL segment. + // If no snapshot is found, the entire segment is marked as valid and queryable. + _partitionUpsertMetadataManager.takeSnapshot(); + _partitionUpsertMetadataManager.removeExpiredPrimaryKeys(); + } else { + // We should remove deleted-keys first and then take a snapshot. This is because the deletedKeysTTL + // flow removes keys from the map and updates to remove valid doc IDs. By taking the snapshot immediately + // after this process, we save one commit cycle, ensuring that the deletion of valid doc IDs is reflected + // immediately + _partitionUpsertMetadataManager.removeExpiredPrimaryKeys(); + _partitionUpsertMetadataManager.takeSnapshot(); + } } while (!_state.isFinal()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index adb9ac7f0e..3b72ddbb21 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -48,6 +48,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.model.IdealState; +import org.apache.pinot.common.Utils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerGauge; @@ -105,8 +106,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // Tracks all the segments managed by this manager (excluding EmptySegment) protected final Set<IndexSegment> _trackedSegments = ConcurrentHashMap.newKeySet(); + // This is to track all the segments where changes took place post last snapshot + // Note: we need not take any _snapshotLock while updating this set as it is only updated by the upsert thread + protected final Set<IndexSegment> _updatedSegmentsSinceLastSnapshot = ConcurrentHashMap.newKeySet(); // NOTE: We do not persist snapshot on the first consuming segment because most segments might not be loaded yet + // We only do this for Full-Upsert tables, for partial-upsert tables, we have a check allSegmentsLoaded protected volatile boolean _gotFirstConsumingSegment = false; protected final ReadWriteLock _snapshotLock; @@ -874,8 +879,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps if (!_enableSnapshot) { return; } - if (!_gotFirstConsumingSegment) { - _logger.info("Skip taking snapshot before getting the first consuming segment"); + if (_partialUpsertHandler == null && !_gotFirstConsumingSegment) { + // We only skip for full-Upsert tables, for partial-upsert tables, we have a check allSegmentsLoaded in + // RealtimeTableDataManager + _logger.info("Skip taking snapshot before getting the first consuming segment for full-upsert table"); return; } if (!startOperation()) { @@ -897,7 +904,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } } - // TODO: Consider optimizing it by tracking and persisting only the changed snapshot protected void doTakeSnapshot() { int numTrackedSegments = _trackedSegments.size(); long numPrimaryKeysInSnapshot = 0L; @@ -916,19 +922,34 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps numConsumingSegments++; continue; } - ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment; - if (!immutableSegment.hasValidDocIdsSnapshotFile()) { - segmentsWithoutSnapshot.add(immutableSegment); + if (!_updatedSegmentsSinceLastSnapshot.contains(segment)) { + // if no updates since last snapshot then skip continue; } - immutableSegment.persistValidDocIdsSnapshot(); - numImmutableSegments++; - numPrimaryKeysInSnapshot += immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality(); + try { + ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment; + if (!immutableSegment.hasValidDocIdsSnapshotFile()) { + segmentsWithoutSnapshot.add(immutableSegment); + continue; + } + immutableSegment.persistValidDocIdsSnapshot(); + _updatedSegmentsSinceLastSnapshot.remove(segment); + numImmutableSegments++; + numPrimaryKeysInSnapshot += immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality(); + } catch (Exception e) { + _logger.warn("Caught exception while taking snapshot for segment: {}, skipping", segment.getSegmentName(), e); + Utils.rethrowException(e); + } } for (ImmutableSegmentImpl segment : segmentsWithoutSnapshot) { - segment.persistValidDocIdsSnapshot(); - numImmutableSegments++; - numPrimaryKeysInSnapshot += segment.getValidDocIds().getMutableRoaringBitmap().getCardinality(); + try { + segment.persistValidDocIdsSnapshot(); + numImmutableSegments++; + numPrimaryKeysInSnapshot += segment.getValidDocIds().getMutableRoaringBitmap().getCardinality(); + } catch (Exception e) { + _logger.warn("Caught exception while taking snapshot for segment: {}, skipping", segment.getSegmentName(), e); + Utils.rethrowException(e); + } } _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, @@ -1112,6 +1133,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // refreshing is done. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } + if (_enableSnapshot) { + _updatedSegmentsSinceLastSnapshot.add(newSegment); + _updatedSegmentsSinceLastSnapshot.add(oldSegment); + } } } @@ -1142,6 +1167,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // Batch refresh takes WLock. Do it outside RLock for clarity. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } + if (_enableSnapshot) { + _updatedSegmentsSinceLastSnapshot.add(segment); + } } } @@ -1159,6 +1187,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // Batch refresh takes WLock. Do it outside RLock for clarity. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } + if (_enableSnapshot) { + _updatedSegmentsSinceLastSnapshot.add(segment); + } } } @@ -1183,6 +1214,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // Batch refresh takes WLock. Do it outside RLock for clarity. doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs); } + if (_enableSnapshot) { + _updatedSegmentsSinceLastSnapshot.add(segment); + } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java index 0e97621733..b96e1fe8e4 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java @@ -209,6 +209,7 @@ public class BasePartitionUpsertMetadataManagerTest { ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01, segmentsTakenSnapshot); seg01.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3), null); upsertMetadataManager.trackSegment(seg01); + upsertMetadataManager.updatedSegmentForSnapshotting(seg01); // seg01 has a tmp snapshot file, but no snapshot file FileUtils.touch(new File(segDir01, V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp")); @@ -216,6 +217,7 @@ public class BasePartitionUpsertMetadataManagerTest { ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02, segmentsTakenSnapshot); seg02.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3, 4, 5), null); upsertMetadataManager.trackSegment(seg02); + upsertMetadataManager.updatedSegmentForSnapshotting(seg02); // seg02 has snapshot file, so its snapshot is taken first. FileUtils.touch(new File(segDir02, V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME)); @@ -223,9 +225,12 @@ public class BasePartitionUpsertMetadataManagerTest { ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03, segmentsTakenSnapshot); seg03.enableUpsert(upsertMetadataManager, createValidDocIds(3, 4, 7), null); upsertMetadataManager.trackSegment(seg03); + upsertMetadataManager.updatedSegmentForSnapshotting(seg03); // The mutable segments will be skipped. - upsertMetadataManager.trackSegment(mock(MutableSegmentImpl.class)); + MutableSegmentImpl seg04 = mock(MutableSegmentImpl.class); + upsertMetadataManager.trackSegment(seg04); + upsertMetadataManager.updatedSegmentForSnapshotting(seg04); upsertMetadataManager.doTakeSnapshot(); assertEquals(segmentsTakenSnapshot.size(), 3); @@ -244,6 +249,57 @@ public class BasePartitionUpsertMetadataManagerTest { assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3); } + @Test + public void testTakeSnapshotInOrderBasedOnUpdates() + throws IOException { + DummyPartitionUpsertMetadataManager upsertMetadataManager = + new DummyPartitionUpsertMetadataManager("myTable", 0, mock(UpsertContext.class)); + + List<String> segmentsTakenSnapshot = new ArrayList<>(); + + File segDir01 = new File(TEMP_DIR, "seg01"); + ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01, segmentsTakenSnapshot); + seg01.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3), null); + upsertMetadataManager.trackSegment(seg01); + upsertMetadataManager.updatedSegmentForSnapshotting(seg01); + // seg01 has a tmp snapshot file, but no snapshot file + FileUtils.touch(new File(segDir01, V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp")); + + File segDir02 = new File(TEMP_DIR, "seg02"); + ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02, segmentsTakenSnapshot); + seg02.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3, 4, 5), null); + upsertMetadataManager.trackSegment(seg02); + upsertMetadataManager.updatedSegmentForSnapshotting(seg02); + // seg02 has snapshot file, so its snapshot is taken first. + FileUtils.touch(new File(segDir02, V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME)); + + File segDir03 = new File(TEMP_DIR, "seg03"); + ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03, segmentsTakenSnapshot); + seg03.enableUpsert(upsertMetadataManager, createValidDocIds(3, 4, 7), null); + upsertMetadataManager.trackSegment(seg03); + + // The mutable segments will be skipped. + MutableSegmentImpl seg04 = mock(MutableSegmentImpl.class); + upsertMetadataManager.trackSegment(seg04); + upsertMetadataManager.updatedSegmentForSnapshotting(seg04); + + upsertMetadataManager.doTakeSnapshot(); + assertEquals(segmentsTakenSnapshot.size(), 2); + // The snapshot of seg02 was taken firstly, as it's the only segment with existing snapshot. + assertEquals(segmentsTakenSnapshot.get(0), "seg02"); + // Set is used to track segments internally, so we can't assert the order of the other segments deterministically, + // but all 3 segments should have taken their snapshots. + assertTrue(segmentsTakenSnapshot.containsAll(Arrays.asList("seg01", "seg02"))); + + assertEquals(TEMP_DIR.list().length, 3); + assertTrue(segDir01.exists()); + assertEquals(seg01.loadValidDocIdsFromSnapshot().getCardinality(), 4); + assertTrue(segDir02.exists()); + assertEquals(seg02.loadValidDocIdsFromSnapshot().getCardinality(), 6); + assertTrue(segDir03.exists()); + assertNull(seg03.loadValidDocIdsFromSnapshot()); + } + @Test public void testConsistencyModeSync() throws Exception { @@ -509,6 +565,10 @@ public class BasePartitionUpsertMetadataManagerTest { _trackedSegments.add(seg); } + public void updatedSegmentForSnapshotting(IndexSegment seg) { + _updatedSegmentsSinceLastSnapshot.add(seg); + } + @Override protected long getNumPrimaryKeys() { return 0; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java index bf2dd611ef..b374855d59 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java @@ -389,6 +389,11 @@ public class TableConfig extends BaseJsonConfig { return _upsertConfig == null ? null : _upsertConfig.getComparisonColumns(); } + @JsonIgnore + public double getUpsertMetadataTTL() { + return _upsertConfig == null ? 0 : _upsertConfig.getMetadataTTL(); + } + @JsonIgnore @Nullable public String getUpsertDeleteRecordColumn() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org