mcvsubbu commented on a change in pull request #6667:
URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r594750850



##########
File path: 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+/**
+ * A PartitionGroup is a group of partitions/shards that the same consumer 
should consume from.

Review comment:
       Do we need this class? It seems to be used as a List<PartitionGroupInfo> 
all the time. We can just use a  Map<Integer, StrramPartitionMsgOffset> 
instead. 
   Or, do you plan to add more members to the class later?
   

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -161,6 +163,48 @@ public boolean getIsSplitCommitEnabled() {
     return _controllerConf.getAcceptSplitCommit();
   }
 
+  /**
+   * Using the ideal state and segment metadata, return a list of {@link 
PartitionGroupMetadata}
+   * for latest segment of each partition group
+   */
+  public List<PartitionGroupMetadata> 
getCurrentPartitionGroupMetadataList(IdealState idealState,

Review comment:
       ```suggestion
     public List<PartitionGroupMetadata> 
getLatestPartitionGroupMetadataList(IdealState idealState,
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -448,14 +494,39 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
     // Refresh the Broker routing to reflect the changes in the segment ZK 
metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, 
committingSegmentName, false, true);
 
-    // Step-2
+    // Get current partition groups - this gives current state of latest 
segments for each partition
+    // E.g. [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+        getCurrentPartitionGroupMetadataList(idealState, streamConfig);
+
+    // Fetches new partition groups, given current partition groups metadata.
+    // Assume stream has partitions A, B, C, all still consuming. Result will 
be A, B, C
+    // Assume A was split into D, E, but messages of A are yet to be consumed, 
result will be A, B, C
+    // Assume A was split into D, E and all messages of A are consumed, result 
will be B, C, D, E.
+    List<PartitionGroupInfo> newPartitionGroupInfoList =
+        getPartitionGroupInfoList(streamConfig, 
currentPartitionGroupMetadataList);
+    Set<Integer> newPartitionGroupSet =
+        
newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
+    int numPartitions = newPartitionGroupInfoList.size();
+
+    // Only if committingSegment's partitionGroup is present in the 
newPartitionGroupInfoList, we create new segment metadata

Review comment:
       There can be race conditions here? A partition may have disappeared (and 
so does not show up in the current list of partitions available for 
consumption).
   I still think it is best that the segment completion logic just creates the 
next segment for each partition that is there in the ideal state _plus_ any new 
ones that it sees -- with one exception as below.
   If a server gets "end-of-partition" message, it completes the segment with 
the end criteria set as 'end of partiition". In such end-criteria, the 
controller should not create a new segment for that partition.
   
   This will be an intuitive design, and easier to reason about, since the 
server is the only one trying to "detect" partition ends. 
   
   i am afraid there can be race conditions where the controller does not see 
the new partitions, but the server has been restarted, so it has not finished 
consuming the partition as yet.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -448,14 +494,39 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
     // Refresh the Broker routing to reflect the changes in the segment ZK 
metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, 
committingSegmentName, false, true);
 
-    // Step-2
+    // Get current partition groups - this gives current state of latest 
segments for each partition
+    // E.g. [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+        getCurrentPartitionGroupMetadataList(idealState, streamConfig);
+
+    // Fetches new partition groups, given current partition groups metadata.
+    // Assume stream has partitions A, B, C, all still consuming. Result will 
be A, B, C
+    // Assume A was split into D, E, but messages of A are yet to be consumed, 
result will be A, B, C

Review comment:
       This code runs in the controller. How does the controller know whether 
servers have consumed messages of partition A or not? There may be multiple 
replicas of servers, some servers have have consumed it, the others may not 
have. How does that reflect here?
   
   Let me know if I am missing something, or is it just comments that need to 
change




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to