xiangfu0 commented on code in PR #13735:
URL: https://github.com/apache/pinot/pull/13735#discussion_r1706016936


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3475,167 +3486,168 @@ public String startReplaceSegments(String 
tableNameWithType, List<String> segmen
       Preconditions.checkState(!segmentsForTable.contains(segment), "Segment: 
%s from 'segmentsTo' exists in table: %s",
           segment, tableNameWithType);
     }
+    List<String> segmentsToCleanUp = new ArrayList<>();
+    synchronized (getLineageUpdaterLock(tableNameWithType)) {
+      try {
+        DEFAULT_RETRY_POLICY.attempt(() -> {
+          // Fetch table config
+          TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+          Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
+
+          // Fetch the segment lineage metadata
+          ZNRecord segmentLineageZNRecord =
+              
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+          SegmentLineage segmentLineage;
+          int expectedVersion;
+          if (segmentLineageZNRecord == null) {
+            segmentLineage = new SegmentLineage(tableNameWithType);
+            expectedVersion = -1;
+          } else {
+            segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+            expectedVersion = segmentLineageZNRecord.getVersion();
+          }
+          // Check that the segment lineage entry id doesn't exist in the 
segment lineage
+          
Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId) 
== null,
+              "Entry id: %s already exists in the segment lineage for table: 
%s", segmentLineageEntryId,
+              tableNameWithType);
+
+          Iterator<Map.Entry<String, LineageEntry>> entryIterator =
+              segmentLineage.getLineageEntries().entrySet().iterator();
+          while (entryIterator.hasNext()) {
+            Map.Entry<String, LineageEntry> entry = entryIterator.next();
+            String entryId = entry.getKey();
+            LineageEntry lineageEntry = entry.getValue();
+
+            // If the lineage entry is in 'REVERTED' state, no need to go 
through the validation because we can regard
+            // the entry as not existing.
+            if (lineageEntry.getState() == LineageEntryState.REVERTED) {
+              // When 'forceCleanup' is enabled, proactively clean up 
'segmentsTo' since it's safe to do so.
+              if (forceCleanup) {
+                segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+              }
+              continue;
+            }
 
-    try {
-      DEFAULT_RETRY_POLICY.attempt(() -> {
-        // Fetch table config
-        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-        Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
-
-        // Fetch the segment lineage metadata
-        ZNRecord segmentLineageZNRecord =
-            
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
-        SegmentLineage segmentLineage;
-        int expectedVersion;
-        if (segmentLineageZNRecord == null) {
-          segmentLineage = new SegmentLineage(tableNameWithType);
-          expectedVersion = -1;
-        } else {
-          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
-          expectedVersion = segmentLineageZNRecord.getVersion();
-        }
-        // Check that the segment lineage entry id doesn't exist in the 
segment lineage
-        
Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId) 
== null,
-            "Entry id: %s already exists in the segment lineage for table: 
%s", segmentLineageEntryId,
-            tableNameWithType);
+            // By here, the lineage entry is either 'IN_PROGRESS' or 
'COMPLETED'.
 
-        List<String> segmentsToCleanUp = new ArrayList<>();
-        Iterator<Map.Entry<String, LineageEntry>> entryIterator =
-            segmentLineage.getLineageEntries().entrySet().iterator();
-        while (entryIterator.hasNext()) {
-          Map.Entry<String, LineageEntry> entry = entryIterator.next();
-          String entryId = entry.getKey();
-          LineageEntry lineageEntry = entry.getValue();
-
-          // If the lineage entry is in 'REVERTED' state, no need to go 
through the validation because we can regard
-          // the entry as not existing.
-          if (lineageEntry.getState() == LineageEntryState.REVERTED) {
-            // When 'forceCleanup' is enabled, proactively clean up 
'segmentsTo' since it's safe to do so.
+            // When 'forceCleanup' is enabled, we need to proactively clean up 
at the following cases:
+            // 1. Revert the lineage entry when we find the lineage entry with 
overlapped 'segmentsFrom' or 'segmentsTo'
+            //    values. This is used to un-block the segment replacement 
protocol if the previous attempt failed in
+            //    the middle.
+            // 2. Proactively delete the oldest data snapshot to make sure 
that we only keep at most 2 data snapshots
+            //    at any time in case of REFRESH use case.
             if (forceCleanup) {
-              segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
-            }
-            continue;
-          }
+              if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && (
+                  !Collections.disjoint(segmentsFrom, 
lineageEntry.getSegmentsFrom()) || !Collections.disjoint(
+                      segmentsTo, lineageEntry.getSegmentsTo()))) {
+                LOGGER.info(
+                    "Detected the incomplete lineage entry with overlapped 
'segmentsFrom' or 'segmentsTo'. Deleting or "
+                        + "reverting the lineage entry to unblock the new 
segment protocol. tableNameWithType={}, "
+                        + "entryId={}, segmentsFrom={}, segmentsTo={}", 
tableNameWithType, entryId,
+                    lineageEntry.getSegmentsFrom(), 
lineageEntry.getSegmentsTo());
+
+                // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
+                // Delete or update segmentsTo of the entry to revert to 
handle the case of rerunning the protocol:
+                // Initial state:
+                //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], 
status: IN_PROGRESS}
+                // 1. Rerunning the protocol with s4 and s5, s4 should not be 
deleted to avoid race conditions of
+                // concurrent data pushes and deletions:
+                //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], 
status: REVERTED}
+                //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], 
status: IN_PROGRESS}
+                // 2. Rerunning the protocol with s3 and s4, we can simply 
remove the 'IN_PROGRESS' entry:
+                //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], 
status: IN_PROGRESS}
+                List<String> segmentsToForEntryToRevert = new 
ArrayList<>(lineageEntry.getSegmentsTo());
+                segmentsToForEntryToRevert.removeAll(segmentsTo);
+                if (segmentsToForEntryToRevert.isEmpty()) {
+                  // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
+                  entryIterator.remove();
+                } else {
+                  // Update the lineage entry to 'REVERTED'
+                  entry.setValue(new 
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
+                      LineageEntryState.REVERTED, System.currentTimeMillis()));
+                }
 
-          // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
-
-          // When 'forceCleanup' is enabled, we need to proactively clean up 
at the following cases:
-          // 1. Revert the lineage entry when we find the lineage entry with 
overlapped 'segmentsFrom' or 'segmentsTo'
-          //    values. This is used to un-block the segment replacement 
protocol if the previous attempt failed in the
-          //    middle.
-          // 2. Proactively delete the oldest data snapshot to make sure that 
we only keep at most 2 data snapshots
-          //    at any time in case of REFRESH use case.
-          if (forceCleanup) {
-            if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && (
-                !Collections.disjoint(segmentsFrom, 
lineageEntry.getSegmentsFrom()) || !Collections.disjoint(segmentsTo,
-                    lineageEntry.getSegmentsTo()))) {
-              LOGGER.info(
-                  "Detected the incomplete lineage entry with overlapped 
'segmentsFrom' or 'segmentsTo'. Deleting or "
-                      + "reverting the lineage entry to unblock the new 
segment protocol. tableNameWithType={}, "
-                      + "entryId={}, segmentsFrom={}, segmentsTo={}", 
tableNameWithType, entryId,
-                  lineageEntry.getSegmentsFrom(), 
lineageEntry.getSegmentsTo());
-
-              // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
-              // Delete or update segmentsTo of the entry to revert to handle 
the case of rerunning the protocol:
-              // Initial state:
-              //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], 
status: IN_PROGRESS}
-              // 1. Rerunning the protocol with s4 and s5, s4 should not be 
deleted to avoid race conditions of
-              // concurrent data pushes and deletions:
-              //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status: 
REVERTED}
-              //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], 
status: IN_PROGRESS}
-              // 2. Rerunning the protocol with s3 and s4, we can simply 
remove the 'IN_PROGRESS' entry:
-              //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], 
status: IN_PROGRESS}
-              List<String> segmentsToForEntryToRevert = new 
ArrayList<>(lineageEntry.getSegmentsTo());
-              segmentsToForEntryToRevert.removeAll(segmentsTo);
-              if (segmentsToForEntryToRevert.isEmpty()) {
-                // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
-                entryIterator.remove();
-              } else {
-                // Update the lineage entry to 'REVERTED'
-                entry.setValue(new 
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
-                    LineageEntryState.REVERTED, System.currentTimeMillis()));
+                // Add segments for proactive clean-up.
+                segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
+              } else if (lineageEntry.getState() == 
LineageEntryState.COMPLETED && "REFRESH".equalsIgnoreCase(
+                  
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && 
CollectionUtils.isEqualCollection(
+                  segmentsFrom, lineageEntry.getSegmentsTo())) {
+                // This part of code assumes that we only allow at most 2 data 
snapshots at a time by proactively
+                // deleting the older snapshots (for REFRESH tables).
+                //
+                // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5)  // 
previous lineage
+                //      (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8)  // 
current lineage to be updated
+                // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want 
to keep 2 data snapshots
+                //    (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to 
avoid the disk space waste.
+                //
+                // TODO: make the number of allowed snapshots configurable to 
allow users to keep at most N snapshots
+                //       of data. We need to traverse the lineage by N steps 
instead of 2 steps. We can build the
+                //       reverse hash map (segmentsTo -> segmentsFrom) and 
traverse up to N times before deleting.
+                LOGGER.info(
+                    "Proactively deleting the replaced segments for REFRESH 
table to avoid the excessive disk waste. "
+                        + "tableNameWithType={}, segmentsToCleanUp={}", 
tableNameWithType,
+                    lineageEntry.getSegmentsFrom());
+                segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
               }
-
-              // Add segments for proactive clean-up.
-              segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
-            } else if (lineageEntry.getState() == LineageEntryState.COMPLETED 
&& "REFRESH".equalsIgnoreCase(
-                
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && 
CollectionUtils.isEqualCollection(
-                segmentsFrom, lineageEntry.getSegmentsTo())) {
-              // This part of code assumes that we only allow at most 2 data 
snapshots at a time by proactively
-              // deleting the older snapshots (for REFRESH tables).
-              //
-              // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5)  // 
previous lineage
-              //      (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8)  // 
current lineage to be updated
-              // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to 
keep 2 data snapshots
-              //    (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid 
the disk space waste.
-              //
-              // TODO: make the number of allowed snapshots configurable to 
allow users to keep at most N snapshots
-              //       of data. We need to traverse the lineage by N steps 
instead of 2 steps. We can build the reverse
-              //       hash map (segmentsTo -> segmentsFrom) and traverse up 
to N times before deleting.
-              //
-              LOGGER.info(
-                  "Proactively deleting the replaced segments for REFRESH 
table to avoid the excessive disk waste. "
-                      + "tableNameWithType={}, segmentsToCleanUp={}", 
tableNameWithType,
-                  lineageEntry.getSegmentsFrom());
-              segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
-            }
-          } else {
-            // Check that any segment from 'segmentsFrom' does not appear 
twice.
-            if (!segmentsFrom.isEmpty()) {
-              Set<String> segmentsFromInLineageEntry = new 
HashSet<>(lineageEntry.getSegmentsFrom());
-              if (!segmentsFromInLineageEntry.isEmpty()) {
-                for (String segment : segmentsFrom) {
-                  
Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment),
-                      "Segment: %s from 'segmentsFrom' exists in table: %s, 
entry id: %s as 'segmentsFrom'"
-                          + " (replacing a replaced segment)", segment, 
tableNameWithType, entryId);
+            } else {
+              // Check that any segment from 'segmentsFrom' does not appear 
twice.
+              if (!segmentsFrom.isEmpty()) {
+                Set<String> segmentsFromInLineageEntry = new 
HashSet<>(lineageEntry.getSegmentsFrom());
+                if (!segmentsFromInLineageEntry.isEmpty()) {
+                  for (String segment : segmentsFrom) {
+                    
Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment),
+                        "Segment: %s from 'segmentsFrom' exists in table: %s, 
entry id: %s as 'segmentsFrom'"
+                            + " (replacing a replaced segment)", segment, 
tableNameWithType, entryId);
+                  }
                 }
               }
-            }
 
-            if (!segmentsTo.isEmpty()) {
-              Set<String> segmentsToInLineageEntry = new 
HashSet<>(lineageEntry.getSegmentsTo());
-              if (!segmentsToInLineageEntry.isEmpty()) {
-                for (String segment : segmentsTo) {
-                  
Preconditions.checkState(!segmentsToInLineageEntry.contains(segment),
-                      "Segment: %s from 'segmentsTo' exists in table: %s, 
entry id: %s as 'segmentTo'"
-                          + " (name conflict)", segment, tableNameWithType, 
entryId);
+              if (!segmentsTo.isEmpty()) {
+                Set<String> segmentsToInLineageEntry = new 
HashSet<>(lineageEntry.getSegmentsTo());
+                if (!segmentsToInLineageEntry.isEmpty()) {
+                  for (String segment : segmentsTo) {
+                    
Preconditions.checkState(!segmentsToInLineageEntry.contains(segment),
+                        "Segment: %s from 'segmentsTo' exists in table: %s, 
entry id: %s as 'segmentTo'"
+                            + " (name conflict)", segment, tableNameWithType, 
entryId);
+                  }
                 }
               }
             }
           }
-        }
 
-        // Update lineage entry
-        segmentLineage.addLineageEntry(segmentLineageEntryId,
-            new LineageEntry(segmentsFrom, segmentsTo, 
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
-
-        _lineageManager.updateLineageForStartReplaceSegments(tableConfig, 
segmentLineageEntryId, customMap,
-            segmentLineage);
-        // Write back to the lineage entry to the property store
-        if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, 
segmentLineage, expectedVersion)) {
-          // Trigger the proactive segment clean up if needed. Once the 
lineage is updated in the property store, it
-          // is safe to physically delete segments.
-          if (!segmentsToCleanUp.isEmpty()) {
-            LOGGER.info("Cleaning up the segments while startReplaceSegments: 
{}", segmentsToCleanUp);
-            deleteSegments(tableNameWithType, segmentsToCleanUp);
+          // Update lineage entry
+          segmentLineage.addLineageEntry(segmentLineageEntryId,
+              new LineageEntry(segmentsFrom, segmentsTo, 
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+          _lineageManager.updateLineageForStartReplaceSegments(tableConfig, 
segmentLineageEntryId, customMap,
+              segmentLineage);
+          // Write back to the lineage entry to the property store
+          if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, 
segmentLineage, expectedVersion)) {
+            return true;
+          } else {
+            LOGGER.warn("Failed to write segment lineage for table: {}", 
tableNameWithType);
+            return false;
           }
-          return true;
-        } else {
-          LOGGER.warn("Failed to write segment lineage for table: {}", 
tableNameWithType);
-          return false;
-        }
-      });
-    } catch (Exception e) {
-      String errorMsg = String.format("Failed to update the segment lineage 
during startReplaceSegments. "
-          + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)", 
tableNameWithType, segmentsFrom, segmentsTo);
-      LOGGER.error(errorMsg, e);
-      throw new RuntimeException(errorMsg, e);
+        });
+      } catch (Exception e) {
+        String errorMsg = String.format("Failed to update the segment lineage 
during startReplaceSegments. "
+            + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)", 
tableNameWithType, segmentsFrom, segmentsTo);
+        LOGGER.error(errorMsg, e);
+        throw new RuntimeException(errorMsg, e);
+      }
+    }
+
+    // Trigger the proactive segment clean up if needed. Once the lineage is 
updated in the property store, it
+    // is safe to physically delete segments.
+    if (!segmentsToCleanUp.isEmpty()) {
+      LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", 
segmentsToCleanUp);
+      deleteSegments(tableNameWithType, segmentsToCleanUp);
     }
 
     // Only successful attempt can reach here
-    LOGGER.info("startReplaceSegments is successfully processed. 
(tableNameWithType = {}, segmentsFrom = {}, "
-            + "segmentsTo = {}, segmentLineageEntryId = {})", 
tableNameWithType, segmentsFrom, segmentsTo,
-        segmentLineageEntryId);
+    LOGGER.info("startReplaceSegments is successfully processed in {} ms. 
(tableNameWithType = {}, segmentsFrom = {}, "

Review Comment:
   Let's also log the number of attempts or even each attempt time?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3475,167 +3486,168 @@ public String startReplaceSegments(String 
tableNameWithType, List<String> segmen
       Preconditions.checkState(!segmentsForTable.contains(segment), "Segment: 
%s from 'segmentsTo' exists in table: %s",
           segment, tableNameWithType);
     }
+    List<String> segmentsToCleanUp = new ArrayList<>();
+    synchronized (getLineageUpdaterLock(tableNameWithType)) {
+      try {
+        DEFAULT_RETRY_POLICY.attempt(() -> {
+          // Fetch table config
+          TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+          Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
+
+          // Fetch the segment lineage metadata
+          ZNRecord segmentLineageZNRecord =
+              
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+          SegmentLineage segmentLineage;
+          int expectedVersion;
+          if (segmentLineageZNRecord == null) {
+            segmentLineage = new SegmentLineage(tableNameWithType);
+            expectedVersion = -1;
+          } else {
+            segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+            expectedVersion = segmentLineageZNRecord.getVersion();
+          }
+          // Check that the segment lineage entry id doesn't exist in the 
segment lineage
+          
Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId) 
== null,
+              "Entry id: %s already exists in the segment lineage for table: 
%s", segmentLineageEntryId,
+              tableNameWithType);
+
+          Iterator<Map.Entry<String, LineageEntry>> entryIterator =
+              segmentLineage.getLineageEntries().entrySet().iterator();
+          while (entryIterator.hasNext()) {
+            Map.Entry<String, LineageEntry> entry = entryIterator.next();
+            String entryId = entry.getKey();
+            LineageEntry lineageEntry = entry.getValue();
+
+            // If the lineage entry is in 'REVERTED' state, no need to go 
through the validation because we can regard
+            // the entry as not existing.
+            if (lineageEntry.getState() == LineageEntryState.REVERTED) {
+              // When 'forceCleanup' is enabled, proactively clean up 
'segmentsTo' since it's safe to do so.
+              if (forceCleanup) {
+                segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+              }
+              continue;
+            }
 
-    try {
-      DEFAULT_RETRY_POLICY.attempt(() -> {
-        // Fetch table config
-        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-        Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
-
-        // Fetch the segment lineage metadata
-        ZNRecord segmentLineageZNRecord =
-            
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
-        SegmentLineage segmentLineage;
-        int expectedVersion;
-        if (segmentLineageZNRecord == null) {
-          segmentLineage = new SegmentLineage(tableNameWithType);
-          expectedVersion = -1;
-        } else {
-          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
-          expectedVersion = segmentLineageZNRecord.getVersion();
-        }
-        // Check that the segment lineage entry id doesn't exist in the 
segment lineage
-        
Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId) 
== null,
-            "Entry id: %s already exists in the segment lineage for table: 
%s", segmentLineageEntryId,
-            tableNameWithType);
+            // By here, the lineage entry is either 'IN_PROGRESS' or 
'COMPLETED'.
 
-        List<String> segmentsToCleanUp = new ArrayList<>();
-        Iterator<Map.Entry<String, LineageEntry>> entryIterator =
-            segmentLineage.getLineageEntries().entrySet().iterator();
-        while (entryIterator.hasNext()) {
-          Map.Entry<String, LineageEntry> entry = entryIterator.next();
-          String entryId = entry.getKey();
-          LineageEntry lineageEntry = entry.getValue();
-
-          // If the lineage entry is in 'REVERTED' state, no need to go 
through the validation because we can regard
-          // the entry as not existing.
-          if (lineageEntry.getState() == LineageEntryState.REVERTED) {
-            // When 'forceCleanup' is enabled, proactively clean up 
'segmentsTo' since it's safe to do so.
+            // When 'forceCleanup' is enabled, we need to proactively clean up 
at the following cases:
+            // 1. Revert the lineage entry when we find the lineage entry with 
overlapped 'segmentsFrom' or 'segmentsTo'
+            //    values. This is used to un-block the segment replacement 
protocol if the previous attempt failed in
+            //    the middle.
+            // 2. Proactively delete the oldest data snapshot to make sure 
that we only keep at most 2 data snapshots
+            //    at any time in case of REFRESH use case.
             if (forceCleanup) {
-              segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
-            }
-            continue;
-          }
+              if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && (
+                  !Collections.disjoint(segmentsFrom, 
lineageEntry.getSegmentsFrom()) || !Collections.disjoint(
+                      segmentsTo, lineageEntry.getSegmentsTo()))) {
+                LOGGER.info(
+                    "Detected the incomplete lineage entry with overlapped 
'segmentsFrom' or 'segmentsTo'. Deleting or "
+                        + "reverting the lineage entry to unblock the new 
segment protocol. tableNameWithType={}, "
+                        + "entryId={}, segmentsFrom={}, segmentsTo={}", 
tableNameWithType, entryId,
+                    lineageEntry.getSegmentsFrom(), 
lineageEntry.getSegmentsTo());
+
+                // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
+                // Delete or update segmentsTo of the entry to revert to 
handle the case of rerunning the protocol:
+                // Initial state:
+                //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], 
status: IN_PROGRESS}
+                // 1. Rerunning the protocol with s4 and s5, s4 should not be 
deleted to avoid race conditions of
+                // concurrent data pushes and deletions:
+                //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], 
status: REVERTED}
+                //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], 
status: IN_PROGRESS}
+                // 2. Rerunning the protocol with s3 and s4, we can simply 
remove the 'IN_PROGRESS' entry:
+                //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], 
status: IN_PROGRESS}
+                List<String> segmentsToForEntryToRevert = new 
ArrayList<>(lineageEntry.getSegmentsTo());
+                segmentsToForEntryToRevert.removeAll(segmentsTo);
+                if (segmentsToForEntryToRevert.isEmpty()) {
+                  // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
+                  entryIterator.remove();
+                } else {
+                  // Update the lineage entry to 'REVERTED'
+                  entry.setValue(new 
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
+                      LineageEntryState.REVERTED, System.currentTimeMillis()));
+                }
 
-          // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
-
-          // When 'forceCleanup' is enabled, we need to proactively clean up 
at the following cases:
-          // 1. Revert the lineage entry when we find the lineage entry with 
overlapped 'segmentsFrom' or 'segmentsTo'
-          //    values. This is used to un-block the segment replacement 
protocol if the previous attempt failed in the
-          //    middle.
-          // 2. Proactively delete the oldest data snapshot to make sure that 
we only keep at most 2 data snapshots
-          //    at any time in case of REFRESH use case.
-          if (forceCleanup) {
-            if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && (
-                !Collections.disjoint(segmentsFrom, 
lineageEntry.getSegmentsFrom()) || !Collections.disjoint(segmentsTo,
-                    lineageEntry.getSegmentsTo()))) {
-              LOGGER.info(
-                  "Detected the incomplete lineage entry with overlapped 
'segmentsFrom' or 'segmentsTo'. Deleting or "
-                      + "reverting the lineage entry to unblock the new 
segment protocol. tableNameWithType={}, "
-                      + "entryId={}, segmentsFrom={}, segmentsTo={}", 
tableNameWithType, entryId,
-                  lineageEntry.getSegmentsFrom(), 
lineageEntry.getSegmentsTo());
-
-              // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
-              // Delete or update segmentsTo of the entry to revert to handle 
the case of rerunning the protocol:
-              // Initial state:
-              //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], 
status: IN_PROGRESS}
-              // 1. Rerunning the protocol with s4 and s5, s4 should not be 
deleted to avoid race conditions of
-              // concurrent data pushes and deletions:
-              //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status: 
REVERTED}
-              //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], 
status: IN_PROGRESS}
-              // 2. Rerunning the protocol with s3 and s4, we can simply 
remove the 'IN_PROGRESS' entry:
-              //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], 
status: IN_PROGRESS}
-              List<String> segmentsToForEntryToRevert = new 
ArrayList<>(lineageEntry.getSegmentsTo());
-              segmentsToForEntryToRevert.removeAll(segmentsTo);
-              if (segmentsToForEntryToRevert.isEmpty()) {
-                // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
-                entryIterator.remove();
-              } else {
-                // Update the lineage entry to 'REVERTED'
-                entry.setValue(new 
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
-                    LineageEntryState.REVERTED, System.currentTimeMillis()));
+                // Add segments for proactive clean-up.
+                segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
+              } else if (lineageEntry.getState() == 
LineageEntryState.COMPLETED && "REFRESH".equalsIgnoreCase(
+                  
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && 
CollectionUtils.isEqualCollection(
+                  segmentsFrom, lineageEntry.getSegmentsTo())) {
+                // This part of code assumes that we only allow at most 2 data 
snapshots at a time by proactively
+                // deleting the older snapshots (for REFRESH tables).
+                //
+                // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5)  // 
previous lineage
+                //      (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8)  // 
current lineage to be updated
+                // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want 
to keep 2 data snapshots
+                //    (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to 
avoid the disk space waste.
+                //
+                // TODO: make the number of allowed snapshots configurable to 
allow users to keep at most N snapshots
+                //       of data. We need to traverse the lineage by N steps 
instead of 2 steps. We can build the
+                //       reverse hash map (segmentsTo -> segmentsFrom) and 
traverse up to N times before deleting.
+                LOGGER.info(
+                    "Proactively deleting the replaced segments for REFRESH 
table to avoid the excessive disk waste. "
+                        + "tableNameWithType={}, segmentsToCleanUp={}", 
tableNameWithType,
+                    lineageEntry.getSegmentsFrom());
+                segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
               }
-
-              // Add segments for proactive clean-up.
-              segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
-            } else if (lineageEntry.getState() == LineageEntryState.COMPLETED 
&& "REFRESH".equalsIgnoreCase(
-                
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && 
CollectionUtils.isEqualCollection(
-                segmentsFrom, lineageEntry.getSegmentsTo())) {
-              // This part of code assumes that we only allow at most 2 data 
snapshots at a time by proactively
-              // deleting the older snapshots (for REFRESH tables).
-              //
-              // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5)  // 
previous lineage
-              //      (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8)  // 
current lineage to be updated
-              // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to 
keep 2 data snapshots
-              //    (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid 
the disk space waste.
-              //
-              // TODO: make the number of allowed snapshots configurable to 
allow users to keep at most N snapshots
-              //       of data. We need to traverse the lineage by N steps 
instead of 2 steps. We can build the reverse
-              //       hash map (segmentsTo -> segmentsFrom) and traverse up 
to N times before deleting.
-              //
-              LOGGER.info(
-                  "Proactively deleting the replaced segments for REFRESH 
table to avoid the excessive disk waste. "
-                      + "tableNameWithType={}, segmentsToCleanUp={}", 
tableNameWithType,
-                  lineageEntry.getSegmentsFrom());
-              segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
-            }
-          } else {
-            // Check that any segment from 'segmentsFrom' does not appear 
twice.
-            if (!segmentsFrom.isEmpty()) {
-              Set<String> segmentsFromInLineageEntry = new 
HashSet<>(lineageEntry.getSegmentsFrom());
-              if (!segmentsFromInLineageEntry.isEmpty()) {
-                for (String segment : segmentsFrom) {
-                  
Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment),
-                      "Segment: %s from 'segmentsFrom' exists in table: %s, 
entry id: %s as 'segmentsFrom'"
-                          + " (replacing a replaced segment)", segment, 
tableNameWithType, entryId);
+            } else {
+              // Check that any segment from 'segmentsFrom' does not appear 
twice.
+              if (!segmentsFrom.isEmpty()) {
+                Set<String> segmentsFromInLineageEntry = new 
HashSet<>(lineageEntry.getSegmentsFrom());
+                if (!segmentsFromInLineageEntry.isEmpty()) {
+                  for (String segment : segmentsFrom) {
+                    
Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment),
+                        "Segment: %s from 'segmentsFrom' exists in table: %s, 
entry id: %s as 'segmentsFrom'"
+                            + " (replacing a replaced segment)", segment, 
tableNameWithType, entryId);
+                  }
                 }
               }
-            }
 
-            if (!segmentsTo.isEmpty()) {
-              Set<String> segmentsToInLineageEntry = new 
HashSet<>(lineageEntry.getSegmentsTo());
-              if (!segmentsToInLineageEntry.isEmpty()) {
-                for (String segment : segmentsTo) {
-                  
Preconditions.checkState(!segmentsToInLineageEntry.contains(segment),
-                      "Segment: %s from 'segmentsTo' exists in table: %s, 
entry id: %s as 'segmentTo'"
-                          + " (name conflict)", segment, tableNameWithType, 
entryId);
+              if (!segmentsTo.isEmpty()) {
+                Set<String> segmentsToInLineageEntry = new 
HashSet<>(lineageEntry.getSegmentsTo());
+                if (!segmentsToInLineageEntry.isEmpty()) {
+                  for (String segment : segmentsTo) {
+                    
Preconditions.checkState(!segmentsToInLineageEntry.contains(segment),
+                        "Segment: %s from 'segmentsTo' exists in table: %s, 
entry id: %s as 'segmentTo'"
+                            + " (name conflict)", segment, tableNameWithType, 
entryId);
+                  }
                 }
               }
             }
           }
-        }
 
-        // Update lineage entry
-        segmentLineage.addLineageEntry(segmentLineageEntryId,
-            new LineageEntry(segmentsFrom, segmentsTo, 
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
-
-        _lineageManager.updateLineageForStartReplaceSegments(tableConfig, 
segmentLineageEntryId, customMap,
-            segmentLineage);
-        // Write back to the lineage entry to the property store
-        if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, 
segmentLineage, expectedVersion)) {
-          // Trigger the proactive segment clean up if needed. Once the 
lineage is updated in the property store, it
-          // is safe to physically delete segments.
-          if (!segmentsToCleanUp.isEmpty()) {
-            LOGGER.info("Cleaning up the segments while startReplaceSegments: 
{}", segmentsToCleanUp);
-            deleteSegments(tableNameWithType, segmentsToCleanUp);
+          // Update lineage entry
+          segmentLineage.addLineageEntry(segmentLineageEntryId,
+              new LineageEntry(segmentsFrom, segmentsTo, 
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+          _lineageManager.updateLineageForStartReplaceSegments(tableConfig, 
segmentLineageEntryId, customMap,
+              segmentLineage);
+          // Write back to the lineage entry to the property store
+          if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, 
segmentLineage, expectedVersion)) {

Review Comment:
   Can we log the time for every lineage processing/compute to get a sense of 
internal lineage reprocessing time?
   
   If it takes more time, then we should think of introducing a table level 
distributed lock. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3862,49 +3876,50 @@ public void revertReplaceSegments(String 
tableNameWithType, String segmentLineag
    * @param lineageUpdateType
    * @param customMap
    */
-  private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig, 
String lineageEntryId,
+  private boolean writeLineageEntryWithLock(TableConfig tableConfig, String 
lineageEntryId,
       LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
       LineageUpdateType lineageUpdateType, Map<String, String> customMap) {
-    for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) {
-      // Fetch the segment lineage
-      ZNRecord segmentLineageToUpdateZNRecord =
-          SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, 
tableConfig.getTableName());
-      int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
-      SegmentLineage segmentLineageToUpdate = 
SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
-      LineageEntry currentLineageEntry = 
segmentLineageToUpdate.getLineageEntry(lineageEntryId);
-
-      // If the lineage entry doesn't match with the previously fetched 
lineage, we need to fail the request.
-      if (!currentLineageEntry.equals(lineageEntryToMatch)) {
-        String errorMsg = String.format(
-            "Aborting the to update lineage entry since we find that the entry 
has been modified for table %s, "
-                + "entry id: %s", tableConfig.getTableName(), lineageEntryId);
-        LOGGER.error(errorMsg);
-        throw new RuntimeException(errorMsg);
-      }
+    String tableNameWithType = tableConfig.getTableName();
+      synchronized (getLineageUpdaterLock(tableNameWithType)) {
+        // Fetch the segment lineage
+        ZNRecord segmentLineageToUpdateZNRecord =
+            
SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, 
tableConfig.getTableName());
+        int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
+        SegmentLineage segmentLineageToUpdate = 
SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
+        LineageEntry currentLineageEntry = 
segmentLineageToUpdate.getLineageEntry(lineageEntryId);
+
+        // If the lineage entry doesn't match with the previously fetched 
lineage, we need to fail the request.
+        if (!currentLineageEntry.equals(lineageEntryToMatch)) {
+          String errorMsg = String.format(
+              "Aborting the to update lineage entry since we find that the 
entry has been modified for table %s, "
+                  + "entry id: %s", tableConfig.getTableName(), 
lineageEntryId);
+          LOGGER.error(errorMsg);
+          throw new RuntimeException(errorMsg);
+        }
 
-      // Update lineage entry
-      segmentLineageToUpdate.updateLineageEntry(lineageEntryId, 
lineageEntryToUpdate);
-      switch (lineageUpdateType) {
-        case START:
-          _lineageManager.updateLineageForStartReplaceSegments(tableConfig, 
lineageEntryId, customMap,
-              segmentLineageToUpdate);
-          break;
-        case END:
-          _lineageManager.updateLineageForEndReplaceSegments(tableConfig, 
lineageEntryId, customMap,
-              segmentLineageToUpdate);
-          break;
-        case REVERT:
-          _lineageManager.updateLineageForRevertReplaceSegments(tableConfig, 
lineageEntryId, customMap,
-              segmentLineageToUpdate);
-          break;
-        default:
-      }
+        // Update lineage entry
+        segmentLineageToUpdate.updateLineageEntry(lineageEntryId, 
lineageEntryToUpdate);
+        switch (lineageUpdateType) {
+          case START:

Review Comment:
   I think this `case START` won't be hit?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -191,7 +191,8 @@ public class PinotHelixResourceManager {
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
   private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 10;

Review Comment:
   DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY is not used anymore.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3862,49 +3876,50 @@ public void revertReplaceSegments(String 
tableNameWithType, String segmentLineag
    * @param lineageUpdateType
    * @param customMap
    */
-  private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig, 
String lineageEntryId,
+  private boolean writeLineageEntryWithLock(TableConfig tableConfig, String 
lineageEntryId,
       LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
       LineageUpdateType lineageUpdateType, Map<String, String> customMap) {
-    for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) {
-      // Fetch the segment lineage
-      ZNRecord segmentLineageToUpdateZNRecord =
-          SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, 
tableConfig.getTableName());
-      int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
-      SegmentLineage segmentLineageToUpdate = 
SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
-      LineageEntry currentLineageEntry = 
segmentLineageToUpdate.getLineageEntry(lineageEntryId);
-
-      // If the lineage entry doesn't match with the previously fetched 
lineage, we need to fail the request.
-      if (!currentLineageEntry.equals(lineageEntryToMatch)) {
-        String errorMsg = String.format(
-            "Aborting the to update lineage entry since we find that the entry 
has been modified for table %s, "
-                + "entry id: %s", tableConfig.getTableName(), lineageEntryId);
-        LOGGER.error(errorMsg);
-        throw new RuntimeException(errorMsg);
-      }
+    String tableNameWithType = tableConfig.getTableName();

Review Comment:
   Considering the distributed update from other controllers, I feel it's still 
better to put a smaller number of retries here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to