Jackie-Jiang commented on code in PR #8663: URL: https://github.com/apache/pinot/pull/8663#discussion_r875297466
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1069,6 +1073,10 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS // and restart consumption from the same offset (if possible) or a newer offset (if realtime stream does // not have the same offset). // In latter case, report data loss. + // 4. There are no segments in the table's idealstate in CONSUMING state, Review Comment: The new scenario is very similar to 1, so we can keep them together: We first check if the `latestSegmentZKMetadata` is `DONE`, then check if `instanceStateMap` contains `CONSUMING` segment. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1147,13 +1155,34 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS } else { if (newPartitionGroupSet.contains(partitionGroupId)) { // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has - // replicas - // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In - // that case, we need to either extend this part to handle the state, or prevent segments from getting - // into - // such state. - LOGGER - .error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName); + // replicas not OFFLINE. Create a new IN_PROGRESS segment in PROPERTYSTORE, + // add it as CONSUMING segment to IDEALSTATE. Currently, only enabled via /resumeConsumption API + + if (enforcedByAdmin) { + LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); + StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getEndOffset()); + StreamPartitionMsgOffset partitionGroupSmallestOffset = + getPartitionGroupSmallestOffset(streamConfig, partitionGroupId); + + // Start offset must be higher than the start offset of the stream + if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) { + LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset, + partitionGroupSmallestOffset, partitionGroupId, realtimeTableName); + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); + startOffset = partitionGroupSmallestOffset; + } + CommittingSegmentDescriptor committingSegmentDescriptor = + new CommittingSegmentDescriptor(null, startOffset.toString(), 0); + createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, + committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, + numReplicas, newPartitionGroupMetadataList); + String newSegmentName = newLLCSegmentName.getSegmentName(); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, + instancePartitionsMap); Review Comment: Can we extract some common logic to a separate helper method? It is almost identical to the all `OFFLINE` scenario except for it using the end-offset for the latest segment instead of start-offset ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -862,7 +863,9 @@ private Map<Integer, SegmentZKMetadata> getLatestSegmentZKMetadataMap(String rea * TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and * reset it to 0 at the end of validateLLC */ - public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig) { + // TODO(saurabh) : We can reduce critical section by having tableName level locks? + public synchronized void ensureAllPartitionsConsuming(TableConfig tableConfig, Review Comment: Let's update the javadoc for this method to include the new scenario: ``` If the consuming segment is deleted: Check whether there are segments in the PROPERTYSTORE with status DONE, but no new segment in status IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is ONLINE ``` (Note that this is very similar to the failure between step-1 and step-2, the only difference is the state in the ideal state) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -862,7 +863,9 @@ private Map<Integer, SegmentZKMetadata> getLatestSegmentZKMetadataMap(String rea * TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and * reset it to 0 at the end of validateLLC */ - public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig) { + // TODO(saurabh) : We can reduce critical section by having tableName level locks? + public synchronized void ensureAllPartitionsConsuming(TableConfig tableConfig, Review Comment: It should work without synchronization because it is performing a check and write ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -862,7 +863,9 @@ private Map<Integer, SegmentZKMetadata> getLatestSegmentZKMetadataMap(String rea * TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and * reset it to 0 at the end of validateLLC */ - public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig) { + // TODO(saurabh) : We can reduce critical section by having tableName level locks? + public synchronized void ensureAllPartitionsConsuming(TableConfig tableConfig, + PartitionLevelStreamConfig streamConfig, boolean enforcedByAdmin) { Review Comment: Rename the flag to `recreateDeletedConsumingSegment` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org