mcvsubbu commented on a change in pull request #6667: URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r606030300
########## File path: pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java ########## @@ -352,12 +367,7 @@ public long getSegmentSizeBytes() { } public String getStreamPartitionMsgOffset() { - if (_streamPartitionMsgOffset != null) { - return _streamPartitionMsgOffset; - } else { - // TODO 5359 remove this once we are all upgraded in controllers and servers. Review comment: Nice to add this in the PR description that insallations must upgrade to at least 0.7.0 before upgrading to a release that has this change ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -448,14 +494,35 @@ private void commitSegmentMetadataInternal(String realtimeTableName, // Refresh the Broker routing to reflect the changes in the segment ZK metadata _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true); - // Step-2 + // Using the latest segment of each partition group, creates a list of {@link PartitionGroupMetadata} + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<PartitionGroupMetadata> currentPartitionGroupMetadataList = + getCurrentPartitionGroupMetadataList(idealState, streamConfig); + + // Fetches new partition groups, given current partition groups metadata. + List<PartitionGroupInfo> newPartitionGroupInfoList = + getNewPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); + Set<Integer> newPartitionGroupSet = + newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet()); + int numPartitions = newPartitionGroupInfoList.size(); Review comment: numPartitionGroups? Or, if you are keeping changes to a min, it is fine to change later ########## File path: pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java ########## @@ -87,11 +87,6 @@ public void setDownloadUrl(String downloadUrl) { public ZNRecord toZNRecord() { ZNRecord znRecord = super.toZNRecord(); znRecord.setSimpleField(START_OFFSET, _startOffset); - if (_endOffset == null) { - // TODO Issue 5359 Keep this until all components have upgraded to a version that can handle _offset being null Review comment: Best to mark it for release-notes and include this in the PR comments. Thanks ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -791,7 +851,7 @@ void updateIdealStateOnSegmentCompletion(String realtimeTableName, String commit @VisibleForTesting void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap, - @Nullable String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, + @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment, Review comment: Instead of making newSegmentName nullable, is it not better to NOT call this method at all if neweSegmentName is null? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java ########## @@ -115,14 +117,45 @@ public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentMan pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); } - public static int getPartitionCount(StreamConfig streamConfig) { - PartitionCountFetcher partitionCountFetcher = new PartitionCountFetcher(streamConfig); + /** + * Fetches the list of {@link PartitionGroupInfo} for the new partition groups for the stream, + * with the help of the {@link PartitionGroupMetadata} of the current partitionGroups. + * + * Reasons why <code>currentPartitionGroupMetadata</code> is needed: + * + * The current partition group metadata is used to determine the offsets that have been consumed for a partition group. + * An example of where the offsets would be used: + * e.g. If partition group 1 contains shardId 1, with status DONE and endOffset 150. There's 2 possibilities: + * 1) the stream indicates that shardId's last offset is 200. + * This tells Pinot that partition group 1 still has messages which haven't been consumed, and must be included in the response. + * 2) the stream indicates that shardId's last offset is 150, + * This tells Pinot that all messages of partition group 1 have been consumed, and it need not be included in the response. + * Thus, this call will skip a partition group when it has reached end of life and all messages from that partition group have been consumed. + * + * The current partition group metadata is also used to know about existing groupings of partitions, + * and accordingly make the new partition groups. + * e.g. Assume that partition group 1 has status IN_PROGRESS and contains shards 0,1,2 + * and partition group 2 has status DONE and contains shards 3,4. + * In the above example, the currentPartitionGroupMetadataList indicates that + * the collection of shards in partition group 1, should remain unchanged in the response, + * whereas shards 3,4 can be added to new partition groups if needed. + * + * @param streamConfig the streamConfig from the tableConfig + * @param currentPartitionGroupMetadataList List of {@link PartitionGroupMetadata} for the current partition groups. + * The size of this list is equal to the number of partition groups, + * and is created using the latest segment zk metadata. + */ + public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, + List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { + PartitionGroupInfoFetcher partitionGroupInfoFetcher = + new PartitionGroupInfoFetcher(streamConfig, currentPartitionGroupMetadataList); try { - RetryPolicies.noDelayRetryPolicy(3).attempt(partitionCountFetcher); - return partitionCountFetcher.getPartitionCount(); + RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupInfoFetcher); Review comment: I think some (preferably variable) delay is always good instead of hitting the stream provider continuously. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -950,14 +1012,23 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS LOGGER.info("Repairing segment: {} which is DONE in segment ZK metadata, but is CONSUMING in IdealState", latestSegmentName); - LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); - String newSegmentName = newLLCSegmentName.getSegmentName(); - CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, - (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, - committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, - segmentAssignment, instancePartitionsMap); + if (newPartitionGroupSet.contains(partitionGroupId)) { Review comment: Move the log statement in 1012/1013 inside the if statement. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1049,6 +1124,25 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS return idealState; } + private StreamPartitionMsgOffset getPartitionGroupStartOffset(StreamConfig streamConfig, int partitionGroupId) { Review comment: suggest rename to getPartitionGroupEarliestOffset() Or, getPartitionGroupOldestOffset() or, perhaps smallestOffset ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java ########## @@ -1243,7 +1259,13 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, // long as the partition function is not changed. int numPartitions = columnPartitionConfig.getNumPartitions(); try { - int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L); + // TODO: currentPartitionGroupMetadata should be fetched from idealState + segmentZkMetadata, so that we get back accurate partitionGroups info + // However this is not an issue for Kafka, since partitionGroups never expire and every partitionGroup has a single partition + // Fix this before opening support for partitioning in Kinesis + int numStreamPartitions = _streamMetadataProvider Review comment: numPartitionGrpups, but we can fix it later with the TODO, Or, submit a pure renaming PR and we can review it as a rubber stamp -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org