Jackie-Jiang commented on code in PR #8663:
URL: https://github.com/apache/pinot/pull/8663#discussion_r877541423


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig 
tableConfig, List<SegmentZKMe
       }
     }
   }
+
+  /**
+   * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to 
resume realtime table consumption
+   * @param tableName
+   */
+  public void resumeRealtimeTableConsumption(String tableName) {
+    TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName, 
TableType.REALTIME);
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+
+    InstancePartitions instancePartitions =
+        
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(),
 tableConfig,
+            InstancePartitionsType.CONSUMING);
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+        Collections.singletonMap(InstancePartitionsType.CONSUMING, 
instancePartitions);
+    IdealState idealState =
+        
HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(), 
realtimeTableName);
+    int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+    Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
+
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+    // Read the smallest offset when a new partition is detected
+    OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+    streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
+    streamConfig.setOffsetCriteria(originalOffsetCriteria);
+
+    int numPartitionGroups = newPartitionGroupMetadataList.size();
+    Map<Integer, SegmentZKMetadata> partitionToLatestSegment = 
getLatestSegmentZKMetadataMap(realtimeTableName);
+    for (PartitionGroupMetadata partitionGroupMetadata : 
newPartitionGroupMetadataList) {
+      int partitionId = partitionGroupMetadata.getPartitionGroupId();
+      if (partitionToLatestSegment.containsKey(partitionId)
+          && 
idealState.getInstanceStateMap(partitionToLatestSegment.get(partitionId).getSegmentName())
+              .containsValue(SegmentStateModel.CONSUMING)) {
+        // IN_PROGRESS segment already exists, NO-OP
+        LOGGER.info("Skipping partitionGroupId {}, IN_PROGRESS segment already 
exists", partitionId);
+      } else {
+        // Step 1: Create PROPERTYSTORE nodes for each partitionId 
(IN_PROGRESS)
+        String offset = partitionGroupMetadata.getStartOffset().toString();
+        long newSegmentCreationTimeMs = System.currentTimeMillis();
+
+        SegmentZKMetadata latestSegmentMeta = 
partitionToLatestSegment.get(partitionId);
+        int seqNumber = latestSegmentMeta == null ? STARTING_SEQUENCE_NUMBER
+            : new 
LLCSegmentName(latestSegmentMeta.getSegmentName()).getSequenceNumber() + 1;
+
+        LLCSegmentName newLLCSegment = new LLCSegmentName(tableName, 
partitionId, seqNumber, newSegmentCreationTimeMs);
+        SegmentZKMetadata newSegmentZKMetadata = new 
SegmentZKMetadata(newLLCSegment.getSegmentName());
+        
newSegmentZKMetadata.setCreationTime(newLLCSegment.getCreationTimeMs());
+        newSegmentZKMetadata.setStartOffset(offset);
+        newSegmentZKMetadata.setNumReplicas(numReplicas);
+        
newSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+        SegmentPartitionMetadata segmentPartitionMetadata =
+            getPartitionMetadataFromTableConfig(tableConfig, partitionId);
+        if (segmentPartitionMetadata != null) {
+          newSegmentZKMetadata.setPartitionMetadata(segmentPartitionMetadata);
+        }
+
+        FlushThresholdUpdater flushThresholdUpdater = 
_flushThresholdUpdateManager
+            .getFlushThresholdUpdater(streamConfig);
+        CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(null, offset, 0);
+        flushThresholdUpdater.updateFlushThreshold(streamConfig,
+            newSegmentZKMetadata, committingSegmentDescriptor, null,
+            getMaxNumPartitionsPerInstance(instancePartitions, 
numPartitionGroups, numReplicas),
+            newPartitionGroupMetadataList);
+
+        _helixResourceManager.getPropertyStore().set(

Review Comment:
   @sajjad-moradi It should be okay because we always follow the same sequence 
of operation as the segment commit protocol:
   ```
   Step 1: Update PROPERTYSTORE to change the old segment metadata status to 
DONE
   Step 2: Update PROPERTYSTORE to create the new segment metadata with status 
IN_PROGRESS
   Step 3: Update IDEALSTATES to include new segment in CONSUMING state, and 
change old segment to ONLINE state.
   ```
   The operation itself is idempotent. Given any intermediate state, it will 
return the same result.
   
   If the operation is not idempotent, then we also need to handle the 
synchronization across multiple controllers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to