mcvsubbu commented on a change in pull request #6667:
URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r606030300



##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
##########
@@ -352,12 +367,7 @@ public long getSegmentSizeBytes() {
       }
 
       public String getStreamPartitionMsgOffset() {
-        if (_streamPartitionMsgOffset != null) {
-          return _streamPartitionMsgOffset;
-        } else {
-          // TODO 5359 remove this once we are all upgraded in controllers and 
servers.

Review comment:
       Nice to add this in the PR description that insallations must upgrade to 
at least 0.7.0 before upgrading to a release that has this change

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -448,14 +494,35 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
     // Refresh the Broker routing to reflect the changes in the segment ZK 
metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, 
committingSegmentName, false, true);
 
-    // Step-2
+    // Using the latest segment of each partition group, creates a list of 
{@link PartitionGroupMetadata}
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+        getCurrentPartitionGroupMetadataList(idealState, streamConfig);
+
+    // Fetches new partition groups, given current partition groups metadata.
+    List<PartitionGroupInfo> newPartitionGroupInfoList =
+        getNewPartitionGroupInfoList(streamConfig, 
currentPartitionGroupMetadataList);
+    Set<Integer> newPartitionGroupSet =
+        
newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
+    int numPartitions = newPartitionGroupInfoList.size();

Review comment:
       numPartitionGroups?
   Or, if you are keeping changes to a min, it is fine to change later

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
##########
@@ -87,11 +87,6 @@ public void setDownloadUrl(String downloadUrl) {
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = super.toZNRecord();
     znRecord.setSimpleField(START_OFFSET, _startOffset);
-    if (_endOffset == null) {
-      // TODO Issue 5359 Keep this until all components have upgraded to a 
version that can handle _offset being null

Review comment:
       Best to mark it for release-notes and include this in the PR comments. 
Thanks

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -791,7 +851,7 @@ void updateIdealStateOnSegmentCompletion(String 
realtimeTableName, String commit
 
   @VisibleForTesting
   void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, 
String>> instanceStatesMap,
-      @Nullable String committingSegmentName, String newSegmentName, 
SegmentAssignment segmentAssignment,
+      @Nullable String committingSegmentName, @Nullable String newSegmentName, 
SegmentAssignment segmentAssignment,

Review comment:
       Instead of making newSegmentName nullable, is it not better to NOT call 
this method at all if neweSegmentName is null?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
##########
@@ -115,14 +117,45 @@ public static void 
buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentMan
     pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, 
idealState);
   }
 
-  public static int getPartitionCount(StreamConfig streamConfig) {
-    PartitionCountFetcher partitionCountFetcher = new 
PartitionCountFetcher(streamConfig);
+  /**
+   * Fetches the list of {@link PartitionGroupInfo} for the new partition 
groups for the stream,
+   * with the help of the {@link PartitionGroupMetadata} of the current 
partitionGroups.
+   *
+   * Reasons why <code>currentPartitionGroupMetadata</code> is needed:
+   *
+   * The current partition group metadata is used to determine the offsets 
that have been consumed for a partition group.
+   * An example of where the offsets would be used:
+   * e.g. If partition group 1 contains shardId 1, with status DONE and 
endOffset 150. There's 2 possibilities:
+   * 1) the stream indicates that shardId's last offset is 200.
+   * This tells Pinot that partition group 1 still has messages which haven't 
been consumed, and must be included in the response.
+   * 2) the stream indicates that shardId's last offset is 150,
+   * This tells Pinot that all messages of partition group 1 have been 
consumed, and it need not be included in the response.
+   * Thus, this call will skip a partition group when it has reached end of 
life and all messages from that partition group have been consumed.
+   *
+   * The current partition group metadata is also used to know about existing 
groupings of partitions,
+   * and accordingly make the new partition groups.
+   * e.g. Assume that partition group 1 has status IN_PROGRESS and contains 
shards 0,1,2
+   * and partition group 2 has status DONE and contains shards 3,4.
+   * In the above example, the currentPartitionGroupMetadataList indicates that
+   * the collection of shards in partition group 1, should remain unchanged in 
the response,
+   * whereas shards 3,4 can be added to new partition groups if needed.
+   *
+   * @param streamConfig the streamConfig from the tableConfig
+   * @param currentPartitionGroupMetadataList List of {@link 
PartitionGroupMetadata} for the current partition groups.
+   *                                          The size of this list is equal 
to the number of partition groups,
+   *                                          and is created using the latest 
segment zk metadata.
+   */
+  public static List<PartitionGroupInfo> 
getPartitionGroupInfoList(StreamConfig streamConfig,
+      List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+    PartitionGroupInfoFetcher partitionGroupInfoFetcher =
+        new PartitionGroupInfoFetcher(streamConfig, 
currentPartitionGroupMetadataList);
     try {
-      RetryPolicies.noDelayRetryPolicy(3).attempt(partitionCountFetcher);
-      return partitionCountFetcher.getPartitionCount();
+      RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupInfoFetcher);

Review comment:
       I think some (preferably variable) delay is always good instead of 
hitting the stream provider continuously.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -950,14 +1012,23 @@ IdealState ensureAllPartitionsConsuming(TableConfig 
tableConfig, PartitionLevelS
             LOGGER.info("Repairing segment: {} which is DONE in segment ZK 
metadata, but is CONSUMING in IdealState",
                 latestSegmentName);
 
-            LLCSegmentName newLLCSegmentName = 
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
-            String newSegmentName = newLLCSegmentName.getSegmentName();
-            CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(latestSegmentName,
-                
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
-            createNewSegmentZKMetadata(tableConfig, streamConfig, 
newLLCSegmentName, currentTimeMs,
-                committingSegmentDescriptor, latestSegmentZKMetadata, 
instancePartitions, numPartitionGroups, numReplicas);
-            updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
latestSegmentName, newSegmentName,
-                segmentAssignment, instancePartitionsMap);
+            if (newPartitionGroupSet.contains(partitionGroupId)) {

Review comment:
       Move the log statement in 1012/1013 inside the if statement.
   

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1049,6 +1124,25 @@ IdealState ensureAllPartitionsConsuming(TableConfig 
tableConfig, PartitionLevelS
     return idealState;
   }
 
+  private StreamPartitionMsgOffset getPartitionGroupStartOffset(StreamConfig 
streamConfig, int partitionGroupId) {

Review comment:
       suggest rename to getPartitionGroupEarliestOffset() Or, 
getPartitionGroupOldestOffset() or, perhaps smallestOffset

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -1243,7 +1259,13 @@ public 
LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,
         //       long as the partition function is not changed.
         int numPartitions = columnPartitionConfig.getNumPartitions();
         try {
-          int numStreamPartitions = 
_streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
+          // TODO: currentPartitionGroupMetadata should be fetched from 
idealState + segmentZkMetadata, so that we get back accurate partitionGroups 
info
+          //  However this is not an issue for Kafka, since partitionGroups 
never expire and every partitionGroup has a single partition
+          //  Fix this before opening support for partitioning in Kinesis
+          int numStreamPartitions = _streamMetadataProvider

Review comment:
       numPartitionGrpups, but we can fix it later with the TODO,
   Or, submit a pure renaming PR and we can review it as a rubber stamp




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to