mcvsubbu commented on a change in pull request #6518:
URL: https://github.com/apache/incubator-pinot/pull/6518#discussion_r571326647



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -449,13 +497,40 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, 
committingSegmentName, false, true);
 
     // Step-2
+
+    // Example: Say we currently were consuming from 2 shards A, B. Of those, 
A is the one committing.
+
+    // Get current partition groups - this gives current state of latest 
segments for each partition [A - DONE], [B - IN_PROGRESS]
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+        getCurrentPartitionGroupMetadataList(idealState, streamConfig);
+
+    // Find new partition groups [A],[B],[C],[D] (assume A split into C D)
+    // If segment has consumed all of A, we will receive B,C,D
+    // If segment is still not reached last msg of A, we will receive A,B,C,D
+    // If there were no splits/merges we would receive A,B
+    List<PartitionGroupInfo> newPartitionGroupInfoList =
+        getPartitionGroupInfoList(streamConfig, 
currentPartitionGroupMetadataList);
+    Set<Integer> newPartitionGroupSet =
+        
newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
+    int numPartitions = newPartitionGroupInfoList.size();
+
+    // Only if committingSegment's partitionGroup is present in the 
newPartitionGroupInfoList, we create new segment metadata
+    String newConsumingSegmentName = null;
+    String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
     long newSegmentCreationTimeMs = getCurrentTimeMs();
-    LLCSegmentName newLLCSegmentName =
-        getNextLLCSegmentName(new LLCSegmentName(committingSegmentName), 
newSegmentCreationTimeMs);
-    createNewSegmentZKMetadata(tableConfig,
-        new PartitionLevelStreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig)),
-        newLLCSegmentName, newSegmentCreationTimeMs, 
committingSegmentDescriptor, committingSegmentZKMetadata,
-        instancePartitions, numPartitions, numReplicas);
+    if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
+          committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
+      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
+          committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas);
+      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    }
+
+    // TODO: Also, create new partition groups here (instead of waiting for 
the Validation Manager)
+    //  Cannot do it at the moment, because of the timestamp suffix on the 
segment name.

Review comment:
       I fixed that by adding a check in the idealstate. Only one partition 
group and seq number combination allowed.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -149,22 +151,22 @@ public boolean isFinal() {
   public class SegmentBuildDescriptor {
     final File _segmentTarFile;
     final Map<String, File> _metadataFileMap;
-    final StreamPartitionMsgOffset _offset;
+    final Checkpoint _offset;

Review comment:
       Why rename this to Checkpoint?

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java
##########
@@ -63,7 +63,7 @@ public String getGroupId() {
     throw new RuntimeException("No groupId in " + getSegmentName());
   }
 
-  public int getPartitionId() {
+  public int getPartitionGroupId() {

Review comment:
       nit: Please change the exception message as well

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
##########
@@ -87,11 +87,6 @@ public void setDownloadUrl(String downloadUrl) {
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = super.toZNRecord();
     znRecord.setSimpleField(START_OFFSET, _startOffset);
-    if (_endOffset == null) {

Review comment:
       Please flag the commit for release notes and add in checkin comments 
that in order for people to use this release, all components should have been 
upgraded to the previous release (or, 0.5.0, not sure. please verify). thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to