swaminathanmanish commented on code in PR #15016: URL: https://github.com/apache/pinot/pull/15016#discussion_r1948893470
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -437,6 +441,56 @@ void persistSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmen } } + private boolean addSegmentToCommittingSegmentsList(String realtimeTableName, String segmentName) { + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + Stat stat = new Stat(); + ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + int expectedVersion = stat.getVersion(); + + // empty ZN record for the table + if (znRecord == null) { + znRecord = new ZNRecord(realtimeTableName); + znRecord.setListField(COMMITTING_SEGMENTS, List.of(segmentName)); + return _propertyStore.create(committingSegmentsListPath, znRecord, AccessOption.PERSISTENT); + } + + // segment already present in the list + List<String> committingSegmentList = znRecord.getListField(COMMITTING_SEGMENTS); Review Comment: You can log.info the size of committing list. Will be useful for debugging. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -611,6 +665,17 @@ protected void preProcessCommitSegmentEndMetadata() { // No-op } + private void updateCommittingSegmentsList(String realtimeTableName, + Callable<Boolean> operation) { + try { + DEFAULT_RETRY_POLICY.attempt(operation); + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L); + LOGGER.error("Failed to update committing segments list for table: {}", realtimeTableName, e); + throw new RuntimeException(e); Review Comment: Clarification - This is debug info. Can we make sure we dont fail the commit operation if committing list update fails (i.e this exception does not bubble up). Any inconsistencies will fixed in SegmentValidationManager right. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -437,6 +441,56 @@ void persistSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmen } } + private boolean addSegmentToCommittingSegmentsList(String realtimeTableName, String segmentName) { + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + Stat stat = new Stat(); + ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + int expectedVersion = stat.getVersion(); + + // empty ZN record for the table + if (znRecord == null) { + znRecord = new ZNRecord(realtimeTableName); + znRecord.setListField(COMMITTING_SEGMENTS, List.of(segmentName)); + return _propertyStore.create(committingSegmentsListPath, znRecord, AccessOption.PERSISTENT); + } + + // segment already present in the list + List<String> committingSegmentList = znRecord.getListField(COMMITTING_SEGMENTS); + if (committingSegmentList != null && committingSegmentList.contains(segmentName)) { + return true; + } + + if (committingSegmentList == null) { + committingSegmentList = List.of(segmentName); + } else { + committingSegmentList.add(segmentName); + } + znRecord.setListField(COMMITTING_SEGMENTS, committingSegmentList); + try { + return _propertyStore.set(committingSegmentsListPath, znRecord, expectedVersion, AccessOption.PERSISTENT); + } catch (ZkBadVersionException e) { + return false; + } + } + + private boolean removeSegmentFromCommittingSegmentsList(String realtimeTableName, String segmentName) { + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + Stat stat = new Stat(); + ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null || !znRecord.getListField( + COMMITTING_SEGMENTS).contains(segmentName)) { + return true; + } + znRecord.getListField(COMMITTING_SEGMENTS).remove(segmentName); + try { + return _propertyStore.set(committingSegmentsListPath, znRecord, stat.getVersion(), AccessOption.PERSISTENT); Review Comment: znRecord also has getVersion(). getVersion from Stat and znRecord are the same? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org