swaminathanmanish commented on code in PR #15016:
URL: https://github.com/apache/pinot/pull/15016#discussion_r1948893470


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -437,6 +441,56 @@ void persistSegmentZKMetadata(String realtimeTableName, 
SegmentZKMetadata segmen
     }
   }
 
+  private boolean addSegmentToCommittingSegmentsList(String realtimeTableName, 
String segmentName) {
+    String committingSegmentsListPath =
+        
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+    Stat stat = new Stat();
+    ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
+    int expectedVersion = stat.getVersion();
+
+    // empty ZN record for the table
+    if (znRecord == null) {
+      znRecord = new ZNRecord(realtimeTableName);
+      znRecord.setListField(COMMITTING_SEGMENTS, List.of(segmentName));
+      return _propertyStore.create(committingSegmentsListPath, znRecord, 
AccessOption.PERSISTENT);
+    }
+
+    // segment already present in the list
+    List<String> committingSegmentList = 
znRecord.getListField(COMMITTING_SEGMENTS);

Review Comment:
   You can log.info the size of committing list. Will be useful for debugging. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -611,6 +665,17 @@ protected void preProcessCommitSegmentEndMetadata() {
     // No-op
   }
 
+  private void updateCommittingSegmentsList(String realtimeTableName,
+      Callable<Boolean> operation) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(operation);
+    } catch (Exception e) {
+      _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
+      LOGGER.error("Failed to update committing segments list for table: {}", 
realtimeTableName, e);
+      throw new RuntimeException(e);

Review Comment:
   Clarification - This is debug info. Can we make sure we dont fail the commit 
operation if committing list update fails (i.e this exception does not bubble 
up). Any inconsistencies will fixed in SegmentValidationManager right.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -437,6 +441,56 @@ void persistSegmentZKMetadata(String realtimeTableName, 
SegmentZKMetadata segmen
     }
   }
 
+  private boolean addSegmentToCommittingSegmentsList(String realtimeTableName, 
String segmentName) {
+    String committingSegmentsListPath =
+        
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+    Stat stat = new Stat();
+    ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
+    int expectedVersion = stat.getVersion();
+
+    // empty ZN record for the table
+    if (znRecord == null) {
+      znRecord = new ZNRecord(realtimeTableName);
+      znRecord.setListField(COMMITTING_SEGMENTS, List.of(segmentName));
+      return _propertyStore.create(committingSegmentsListPath, znRecord, 
AccessOption.PERSISTENT);
+    }
+
+    // segment already present in the list
+    List<String> committingSegmentList = 
znRecord.getListField(COMMITTING_SEGMENTS);
+    if (committingSegmentList != null && 
committingSegmentList.contains(segmentName)) {
+      return true;
+    }
+
+    if (committingSegmentList == null) {
+      committingSegmentList = List.of(segmentName);
+    } else {
+      committingSegmentList.add(segmentName);
+    }
+    znRecord.setListField(COMMITTING_SEGMENTS, committingSegmentList);
+    try {
+      return _propertyStore.set(committingSegmentsListPath, znRecord, 
expectedVersion, AccessOption.PERSISTENT);
+    } catch (ZkBadVersionException e) {
+      return false;
+    }
+  }
+
+  private boolean removeSegmentFromCommittingSegmentsList(String 
realtimeTableName, String segmentName) {
+    String committingSegmentsListPath =
+        
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+    Stat stat = new Stat();
+    ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
+    if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null 
|| !znRecord.getListField(
+        COMMITTING_SEGMENTS).contains(segmentName)) {
+      return true;
+    }
+    znRecord.getListField(COMMITTING_SEGMENTS).remove(segmentName);
+    try {
+      return _propertyStore.set(committingSegmentsListPath, znRecord, 
stat.getVersion(), AccessOption.PERSISTENT);

Review Comment:
   znRecord also has getVersion(). getVersion from Stat and znRecord are the 
same? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to