Jackie-Jiang commented on code in PR #8663:
URL: https://github.com/apache/pinot/pull/8663#discussion_r875297466


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1069,6 +1073,10 @@ IdealState ensureAllPartitionsConsuming(TableConfig 
tableConfig, PartitionLevelS
     //       and restart consumption from the same offset (if possible) or a 
newer offset (if realtime stream does
     //       not have the same offset).
     //       In latter case, report data loss.
+    // 4. There are no segments in the table's idealstate in CONSUMING state,

Review Comment:
   The new scenario is very similar to 1, so we can keep them together:
   We first check if the `latestSegmentZKMetadata` is `DONE`, then check if 
`instanceStateMap` contains `CONSUMING` segment.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1147,13 +1155,34 @@ IdealState ensureAllPartitionsConsuming(TableConfig 
tableConfig, PartitionLevelS
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
               // If we get here, that means in IdealState, the latest segment 
has no CONSUMING replicas, but has
-              // replicas
-              // not OFFLINE. That is an unexpected state which cannot be 
fixed by the validation manager currently. In
-              // that case, we need to either extend this part to handle the 
state, or prevent segments from getting
-              // into
-              // such state.
-              LOGGER
-                  .error("Got unexpected instance state map: {} for segment: 
{}", instanceStateMap, latestSegmentName);
+              // replicas not OFFLINE. Create a new IN_PROGRESS segment in 
PROPERTYSTORE,
+              // add it as CONSUMING segment to IDEALSTATE. Currently, only 
enabled via /resumeConsumption API
+
+              if (enforcedByAdmin) {
+                LLCSegmentName newLLCSegmentName = 
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
+                StreamPartitionMsgOffset startOffset = 
offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
+                StreamPartitionMsgOffset partitionGroupSmallestOffset =
+                    getPartitionGroupSmallestOffset(streamConfig, 
partitionGroupId);
+
+                // Start offset must be higher than the start offset of the 
stream
+                if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
+                  LOGGER.error("Data lost from offset: {} to: {} for 
partition: {} of table: {}", startOffset,
+                      partitionGroupSmallestOffset, partitionGroupId, 
realtimeTableName);
+                  _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
+                  startOffset = partitionGroupSmallestOffset;
+                }
+                CommittingSegmentDescriptor committingSegmentDescriptor =
+                    new CommittingSegmentDescriptor(null, 
startOffset.toString(), 0);
+                createNewSegmentZKMetadata(tableConfig, streamConfig, 
newLLCSegmentName, currentTimeMs,
+                    committingSegmentDescriptor, latestSegmentZKMetadata, 
instancePartitions, numPartitions,
+                    numReplicas, newPartitionGroupMetadataList);
+                String newSegmentName = newLLCSegmentName.getSegmentName();
+                updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
null, newSegmentName, segmentAssignment,
+                    instancePartitionsMap);

Review Comment:
   Can we extract some common logic to a separate helper method? It is almost 
identical to the all `OFFLINE` scenario except for it using the end-offset for 
the latest segment instead of start-offset



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -862,7 +863,9 @@ private Map<Integer, SegmentZKMetadata> 
getLatestSegmentZKMetadataMap(String rea
    * TODO: We need to find a place to detect and update a gauge for 
nonConsumingPartitionsCount for a table, and
    * reset it to 0 at the end of validateLLC
    */
-  public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig) {
+  // TODO(saurabh) : We can reduce critical section by having tableName level 
locks?
+  public synchronized void ensureAllPartitionsConsuming(TableConfig 
tableConfig,

Review Comment:
   Let's update the javadoc for this method to include the new scenario:
   ```
   If the consuming segment is deleted:
   Check whether there are segments in the PROPERTYSTORE with status DONE, but 
no new segment in status IN_PROGRESS, and the state for the latest segment in 
the IDEALSTATE is ONLINE
   ```
   (Note that this is very similar to the failure between step-1 and step-2, 
the only difference is the state in the ideal state)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -862,7 +863,9 @@ private Map<Integer, SegmentZKMetadata> 
getLatestSegmentZKMetadataMap(String rea
    * TODO: We need to find a place to detect and update a gauge for 
nonConsumingPartitionsCount for a table, and
    * reset it to 0 at the end of validateLLC
    */
-  public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig) {
+  // TODO(saurabh) : We can reduce critical section by having tableName level 
locks?
+  public synchronized void ensureAllPartitionsConsuming(TableConfig 
tableConfig,

Review Comment:
   It should work without synchronization because it is performing a check and 
write



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -862,7 +863,9 @@ private Map<Integer, SegmentZKMetadata> 
getLatestSegmentZKMetadataMap(String rea
    * TODO: We need to find a place to detect and update a gauge for 
nonConsumingPartitionsCount for a table, and
    * reset it to 0 at the end of validateLLC
    */
-  public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig) {
+  // TODO(saurabh) : We can reduce critical section by having tableName level 
locks?
+  public synchronized void ensureAllPartitionsConsuming(TableConfig 
tableConfig,
+      PartitionLevelStreamConfig streamConfig, boolean enforcedByAdmin) {

Review Comment:
   Rename the flag to `recreateDeletedConsumingSegment`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to