sajjad-moradi commented on code in PR #8663:
URL: https://github.com/apache/pinot/pull/8663#discussion_r877558880


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig 
tableConfig, List<SegmentZKMe
       }
     }
   }
+
+  /**
+   * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to 
resume realtime table consumption
+   * @param tableName
+   */
+  public void resumeRealtimeTableConsumption(String tableName) {
+    TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName, 
TableType.REALTIME);
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+
+    InstancePartitions instancePartitions =
+        
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(),
 tableConfig,
+            InstancePartitionsType.CONSUMING);
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+        Collections.singletonMap(InstancePartitionsType.CONSUMING, 
instancePartitions);
+    IdealState idealState =
+        
HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(), 
realtimeTableName);
+    int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+    Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
+
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+    // Read the smallest offset when a new partition is detected
+    OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+    streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
+    streamConfig.setOffsetCriteria(originalOffsetCriteria);
+
+    int numPartitionGroups = newPartitionGroupMetadataList.size();
+    Map<Integer, SegmentZKMetadata> partitionToLatestSegment = 
getLatestSegmentZKMetadataMap(realtimeTableName);
+    for (PartitionGroupMetadata partitionGroupMetadata : 
newPartitionGroupMetadataList) {
+      int partitionId = partitionGroupMetadata.getPartitionGroupId();
+      if (partitionToLatestSegment.containsKey(partitionId)
+          && 
idealState.getInstanceStateMap(partitionToLatestSegment.get(partitionId).getSegmentName())
+              .containsValue(SegmentStateModel.CONSUMING)) {
+        // IN_PROGRESS segment already exists, NO-OP
+        LOGGER.info("Skipping partitionGroupId {}, IN_PROGRESS segment already 
exists", partitionId);
+      } else {
+        // Step 1: Create PROPERTYSTORE nodes for each partitionId 
(IN_PROGRESS)
+        String offset = partitionGroupMetadata.getStartOffset().toString();
+        long newSegmentCreationTimeMs = System.currentTimeMillis();
+
+        SegmentZKMetadata latestSegmentMeta = 
partitionToLatestSegment.get(partitionId);
+        int seqNumber = latestSegmentMeta == null ? STARTING_SEQUENCE_NUMBER
+            : new 
LLCSegmentName(latestSegmentMeta.getSegmentName()).getSequenceNumber() + 1;
+
+        LLCSegmentName newLLCSegment = new LLCSegmentName(tableName, 
partitionId, seqNumber, newSegmentCreationTimeMs);
+        SegmentZKMetadata newSegmentZKMetadata = new 
SegmentZKMetadata(newLLCSegment.getSegmentName());
+        
newSegmentZKMetadata.setCreationTime(newLLCSegment.getCreationTimeMs());
+        newSegmentZKMetadata.setStartOffset(offset);
+        newSegmentZKMetadata.setNumReplicas(numReplicas);
+        
newSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+        SegmentPartitionMetadata segmentPartitionMetadata =
+            getPartitionMetadataFromTableConfig(tableConfig, partitionId);
+        if (segmentPartitionMetadata != null) {
+          newSegmentZKMetadata.setPartitionMetadata(segmentPartitionMetadata);
+        }
+
+        FlushThresholdUpdater flushThresholdUpdater = 
_flushThresholdUpdateManager
+            .getFlushThresholdUpdater(streamConfig);
+        CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(null, offset, 0);
+        flushThresholdUpdater.updateFlushThreshold(streamConfig,
+            newSegmentZKMetadata, committingSegmentDescriptor, null,
+            getMaxNumPartitionsPerInstance(instancePartitions, 
numPartitionGroups, numReplicas),
+            newPartitionGroupMetadataList);
+
+        _helixResourceManager.getPropertyStore().set(

Review Comment:
   Step 2 is not idempotent. Because of the timestamp in the new segment name, 
everytime step 2 gets executed, we'll get a new segment name which leads to new 
segZkMetadata:
   ```java
         long newSegmentCreationTimeMs = getCurrentTimeMs();
         LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
             committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
         createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
             committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, numPartitionGroups, numReplicas,
             newPartitionGroupMetadataList);
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to