9aman commented on code in PR #15016: URL: https://github.com/apache/pinot/pull/15016#discussion_r1948940832
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -437,6 +441,56 @@ void persistSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmen } } + private boolean addSegmentToCommittingSegmentsList(String realtimeTableName, String segmentName) { + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + Stat stat = new Stat(); + ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + int expectedVersion = stat.getVersion(); + + // empty ZN record for the table + if (znRecord == null) { + znRecord = new ZNRecord(realtimeTableName); + znRecord.setListField(COMMITTING_SEGMENTS, List.of(segmentName)); + return _propertyStore.create(committingSegmentsListPath, znRecord, AccessOption.PERSISTENT); + } + + // segment already present in the list + List<String> committingSegmentList = znRecord.getListField(COMMITTING_SEGMENTS); + if (committingSegmentList != null && committingSegmentList.contains(segmentName)) { + return true; + } + + if (committingSegmentList == null) { + committingSegmentList = List.of(segmentName); + } else { + committingSegmentList.add(segmentName); + } + znRecord.setListField(COMMITTING_SEGMENTS, committingSegmentList); + try { + return _propertyStore.set(committingSegmentsListPath, znRecord, expectedVersion, AccessOption.PERSISTENT); + } catch (ZkBadVersionException e) { + return false; + } + } + + private boolean removeSegmentFromCommittingSegmentsList(String realtimeTableName, String segmentName) { + String committingSegmentsListPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); + Stat stat = new Stat(); + ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null || !znRecord.getListField( + COMMITTING_SEGMENTS).contains(segmentName)) { + return true; + } + znRecord.getListField(COMMITTING_SEGMENTS).remove(segmentName); + try { + return _propertyStore.set(committingSegmentsListPath, znRecord, stat.getVersion(), AccessOption.PERSISTENT); Review Comment: Referring to other implementations, we use stat to get the version before we make an update. ``` private SegmentZKMetadata updateCommittingSegmentZKMetadata(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { Stat stat = new Stat(); ... ... ... persistSegmentZKMetadata(realtimeTableName, committingSegmentZKMetadata, stat.getVersion()); return committingSegmentZKMetadata; } @VisibleForTesting void persistSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmentZKMetadata, int expectedVersion) { String segmentName = segmentZKMetadata.getSegmentName(); LOGGER.info("Persisting segment ZK metadata for segment: {}", segmentName); try { Preconditions.checkState( _propertyStore.set(ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, segmentName), segmentZKMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT), "Failed to persist segment ZK metadata for segment: %s of table: %s", segmentName, realtimeTableName); } catch (Exception e) { _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L); throw e; } } ``` I am not sure whether they are same or not. Need to check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org