This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d91ad73fec Optimize snapshot flow to only snapshot segments which have 
updates (#13285)
d91ad73fec is described below

commit d91ad73fecc2989cb24cdfe3cbaf0ad4da469c12
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Wed Jun 12 04:01:59 2024 +0530

    Optimize snapshot flow to only snapshot segments which have updates (#13285)
    
    * Optimize snapshot flow to only snapshot segments which are updated since 
last snapshot
    
    * enable snapshotting before consumption for partial-upsert tables
    
    * end snapshotting if error found
---
 .../realtime/RealtimeSegmentDataManager.java       | 19 +++++--
 .../upsert/BasePartitionUpsertMetadataManager.java | 58 +++++++++++++++-----
 .../BasePartitionUpsertMetadataManagerTest.java    | 62 +++++++++++++++++++++-
 .../apache/pinot/spi/config/table/TableConfig.java |  5 ++
 4 files changed, 128 insertions(+), 16 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 8dc7842f81..44cea7155d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -703,9 +703,22 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         //   persisted.
         // Take upsert snapshot before starting consuming events
         if (_partitionUpsertMetadataManager != null) {
-          _partitionUpsertMetadataManager.takeSnapshot();
-          // If upsertTTL is enabled, we will remove expired primary keys from 
upsertMetadata after taking snapshot.
-          _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
+          if (_tableConfig.getUpsertMetadataTTL() > 0) {
+            // If upsertMetadataTTL is enabled, we will remove expired primary 
keys from upsertMetadata
+            // AFTER taking a snapshot. Taking the snapshot first is crucial 
to capture the final
+            // state of each key before it exits the TTL window. Out-of-TTL 
segments are skipped in
+            // the doAddSegment flow, and the snapshot is used to enableUpsert 
on the immutable out-of-TTL segment.
+            // If no snapshot is found, the entire segment is marked as valid 
and queryable.
+            _partitionUpsertMetadataManager.takeSnapshot();
+            _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
+          } else {
+            // We should remove deleted-keys first and then take a snapshot. 
This is because the deletedKeysTTL
+            // flow removes keys from the map and updates to remove valid doc 
IDs. By taking the snapshot immediately
+            // after this process, we save one commit cycle, ensuring that the 
deletion of valid doc IDs is reflected
+            // immediately
+            _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
+            _partitionUpsertMetadataManager.takeSnapshot();
+          }
         }
 
         while (!_state.isFinal()) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index adb9ac7f0e..3b72ddbb21 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -48,6 +48,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerGauge;
@@ -105,8 +106,12 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
   // Tracks all the segments managed by this manager (excluding EmptySegment)
   protected final Set<IndexSegment> _trackedSegments = 
ConcurrentHashMap.newKeySet();
+  // This is to track all the segments where changes took place post last 
snapshot
+  // Note: we need not take any _snapshotLock while updating this set as it is 
only updated by the upsert thread
+  protected final Set<IndexSegment> _updatedSegmentsSinceLastSnapshot = 
ConcurrentHashMap.newKeySet();
 
   // NOTE: We do not persist snapshot on the first consuming segment because 
most segments might not be loaded yet
+  // We only do this for Full-Upsert tables, for partial-upsert tables, we 
have a check allSegmentsLoaded
   protected volatile boolean _gotFirstConsumingSegment = false;
   protected final ReadWriteLock _snapshotLock;
 
@@ -874,8 +879,10 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     if (!_enableSnapshot) {
       return;
     }
-    if (!_gotFirstConsumingSegment) {
-      _logger.info("Skip taking snapshot before getting the first consuming 
segment");
+    if (_partialUpsertHandler == null && !_gotFirstConsumingSegment) {
+      // We only skip for full-Upsert tables, for partial-upsert tables, we 
have a check allSegmentsLoaded in
+      // RealtimeTableDataManager
+      _logger.info("Skip taking snapshot before getting the first consuming 
segment for full-upsert table");
       return;
     }
     if (!startOperation()) {
@@ -897,7 +904,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
-  // TODO: Consider optimizing it by tracking and persisting only the changed 
snapshot
   protected void doTakeSnapshot() {
     int numTrackedSegments = _trackedSegments.size();
     long numPrimaryKeysInSnapshot = 0L;
@@ -916,19 +922,34 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         numConsumingSegments++;
         continue;
       }
-      ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
-      if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
-        segmentsWithoutSnapshot.add(immutableSegment);
+      if (!_updatedSegmentsSinceLastSnapshot.contains(segment)) {
+        // if no updates since last snapshot then skip
         continue;
       }
-      immutableSegment.persistValidDocIdsSnapshot();
-      numImmutableSegments++;
-      numPrimaryKeysInSnapshot += 
immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+      try {
+        ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
+        if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
+          segmentsWithoutSnapshot.add(immutableSegment);
+          continue;
+        }
+        immutableSegment.persistValidDocIdsSnapshot();
+        _updatedSegmentsSinceLastSnapshot.remove(segment);
+        numImmutableSegments++;
+        numPrimaryKeysInSnapshot += 
immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+      } catch (Exception e) {
+        _logger.warn("Caught exception while taking snapshot for segment: {}, 
skipping", segment.getSegmentName(), e);
+        Utils.rethrowException(e);
+      }
     }
     for (ImmutableSegmentImpl segment : segmentsWithoutSnapshot) {
-      segment.persistValidDocIdsSnapshot();
-      numImmutableSegments++;
-      numPrimaryKeysInSnapshot += 
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+      try {
+        segment.persistValidDocIdsSnapshot();
+        numImmutableSegments++;
+        numPrimaryKeysInSnapshot += 
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+      } catch (Exception e) {
+        _logger.warn("Caught exception while taking snapshot for segment: {}, 
skipping", segment.getSegmentName(), e);
+        Utils.rethrowException(e);
+      }
     }
 
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
@@ -1112,6 +1133,10 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         // refreshing is done.
         doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
       }
+      if (_enableSnapshot) {
+        _updatedSegmentsSinceLastSnapshot.add(newSegment);
+        _updatedSegmentsSinceLastSnapshot.add(oldSegment);
+      }
     }
   }
 
@@ -1142,6 +1167,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         // Batch refresh takes WLock. Do it outside RLock for clarity.
         doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
       }
+      if (_enableSnapshot) {
+        _updatedSegmentsSinceLastSnapshot.add(segment);
+      }
     }
   }
 
@@ -1159,6 +1187,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         // Batch refresh takes WLock. Do it outside RLock for clarity.
         doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
       }
+      if (_enableSnapshot) {
+        _updatedSegmentsSinceLastSnapshot.add(segment);
+      }
     }
   }
 
@@ -1183,6 +1214,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         // Batch refresh takes WLock. Do it outside RLock for clarity.
         doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
       }
+      if (_enableSnapshot) {
+        _updatedSegmentsSinceLastSnapshot.add(segment);
+      }
     }
   }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 0e97621733..b96e1fe8e4 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -209,6 +209,7 @@ public class BasePartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01, 
segmentsTakenSnapshot);
     seg01.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3), 
null);
     upsertMetadataManager.trackSegment(seg01);
+    upsertMetadataManager.updatedSegmentForSnapshotting(seg01);
     // seg01 has a tmp snapshot file, but no snapshot file
     FileUtils.touch(new File(segDir01, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp"));
 
@@ -216,6 +217,7 @@ public class BasePartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02, 
segmentsTakenSnapshot);
     seg02.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3, 4, 
5), null);
     upsertMetadataManager.trackSegment(seg02);
+    upsertMetadataManager.updatedSegmentForSnapshotting(seg02);
     // seg02 has snapshot file, so its snapshot is taken first.
     FileUtils.touch(new File(segDir02, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
 
@@ -223,9 +225,12 @@ public class BasePartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03, 
segmentsTakenSnapshot);
     seg03.enableUpsert(upsertMetadataManager, createValidDocIds(3, 4, 7), 
null);
     upsertMetadataManager.trackSegment(seg03);
+    upsertMetadataManager.updatedSegmentForSnapshotting(seg03);
 
     // The mutable segments will be skipped.
-    upsertMetadataManager.trackSegment(mock(MutableSegmentImpl.class));
+    MutableSegmentImpl seg04 = mock(MutableSegmentImpl.class);
+    upsertMetadataManager.trackSegment(seg04);
+    upsertMetadataManager.updatedSegmentForSnapshotting(seg04);
 
     upsertMetadataManager.doTakeSnapshot();
     assertEquals(segmentsTakenSnapshot.size(), 3);
@@ -244,6 +249,57 @@ public class BasePartitionUpsertMetadataManagerTest {
     assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
   }
 
+  @Test
+  public void testTakeSnapshotInOrderBasedOnUpdates()
+      throws IOException {
+    DummyPartitionUpsertMetadataManager upsertMetadataManager =
+        new DummyPartitionUpsertMetadataManager("myTable", 0, 
mock(UpsertContext.class));
+
+    List<String> segmentsTakenSnapshot = new ArrayList<>();
+
+    File segDir01 = new File(TEMP_DIR, "seg01");
+    ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01, 
segmentsTakenSnapshot);
+    seg01.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3), 
null);
+    upsertMetadataManager.trackSegment(seg01);
+    upsertMetadataManager.updatedSegmentForSnapshotting(seg01);
+    // seg01 has a tmp snapshot file, but no snapshot file
+    FileUtils.touch(new File(segDir01, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp"));
+
+    File segDir02 = new File(TEMP_DIR, "seg02");
+    ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02, 
segmentsTakenSnapshot);
+    seg02.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3, 4, 
5), null);
+    upsertMetadataManager.trackSegment(seg02);
+    upsertMetadataManager.updatedSegmentForSnapshotting(seg02);
+    // seg02 has snapshot file, so its snapshot is taken first.
+    FileUtils.touch(new File(segDir02, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+
+    File segDir03 = new File(TEMP_DIR, "seg03");
+    ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03, 
segmentsTakenSnapshot);
+    seg03.enableUpsert(upsertMetadataManager, createValidDocIds(3, 4, 7), 
null);
+    upsertMetadataManager.trackSegment(seg03);
+
+    // The mutable segments will be skipped.
+    MutableSegmentImpl seg04 = mock(MutableSegmentImpl.class);
+    upsertMetadataManager.trackSegment(seg04);
+    upsertMetadataManager.updatedSegmentForSnapshotting(seg04);
+
+    upsertMetadataManager.doTakeSnapshot();
+    assertEquals(segmentsTakenSnapshot.size(), 2);
+    // The snapshot of seg02 was taken firstly, as it's the only segment with 
existing snapshot.
+    assertEquals(segmentsTakenSnapshot.get(0), "seg02");
+    // Set is used to track segments internally, so we can't assert the order 
of the other segments deterministically,
+    // but all 3 segments should have taken their snapshots.
+    assertTrue(segmentsTakenSnapshot.containsAll(Arrays.asList("seg01", 
"seg02")));
+
+    assertEquals(TEMP_DIR.list().length, 3);
+    assertTrue(segDir01.exists());
+    assertEquals(seg01.loadValidDocIdsFromSnapshot().getCardinality(), 4);
+    assertTrue(segDir02.exists());
+    assertEquals(seg02.loadValidDocIdsFromSnapshot().getCardinality(), 6);
+    assertTrue(segDir03.exists());
+    assertNull(seg03.loadValidDocIdsFromSnapshot());
+  }
+
   @Test
   public void testConsistencyModeSync()
       throws Exception {
@@ -509,6 +565,10 @@ public class BasePartitionUpsertMetadataManagerTest {
       _trackedSegments.add(seg);
     }
 
+    public void updatedSegmentForSnapshotting(IndexSegment seg) {
+      _updatedSegmentsSinceLastSnapshot.add(seg);
+    }
+
     @Override
     protected long getNumPrimaryKeys() {
       return 0;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index bf2dd611ef..b374855d59 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -389,6 +389,11 @@ public class TableConfig extends BaseJsonConfig {
     return _upsertConfig == null ? null : _upsertConfig.getComparisonColumns();
   }
 
+  @JsonIgnore
+  public double getUpsertMetadataTTL() {
+    return _upsertConfig == null ? 0 : _upsertConfig.getMetadataTTL();
+  }
+
   @JsonIgnore
   @Nullable
   public String getUpsertDeleteRecordColumn() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to