npawar commented on a change in pull request #7066: URL: https://github.com/apache/pinot/pull/7066#discussion_r682955117
########## File path: pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java ########## @@ -367,7 +371,7 @@ public void testSegmentSizeBasedUpdaterWithModifications() { mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows); committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed); flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, - committingSegmentZKMetadata, 1); + committingSegmentZKMetadata, 1, Collections.emptyList()); Review comment: is it possible to add at least 1 test that exercises this 1) where it picks 0 2) it picks another partition id which is lowest ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -1184,7 +1187,8 @@ private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStr CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs, - committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas); + committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas, + Collections.singletonList(partitionGroupMetadata)); Review comment: this should pass the whole list right? if we send just the one, every partition will update threshold based on its own partition, and this will not be the same as using only 0th/lowest available partition. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java ########## @@ -102,8 +106,14 @@ public synchronized void updateFlushThreshold(PartitionLevelStreamConfig streamC // less same characteristics at any one point in time). // However, when we start a new table or change controller mastership, we can have any partition completing first. // It is best to learn the ratio as quickly as we can, so we allow any partition to supply the value. - // FIXME: The stream may not have partition "0" - if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 || _latestSegmentRowsToSizeRatio == 0) { + + // Partition group id 0 might not be available always. We take the smallest available partition id in that case to update the threshold + int smallestAvailablePartitionGroupId = + partitionGroupMetadataList.stream().min(Comparator.comparingInt(PartitionGroupMetadata::getPartitionGroupId)) + .map(PartitionGroupMetadata::getPartitionGroupId).orElseGet(() -> 0); Review comment: seems like the `orElseGet(()->0)` is added only for the test purposes? In an actual setup, you would never reach that path? In that case, can we just send the right input in the test, instead of doing this? We still might have to guard against the case where all shards expired (can happen in kinesis). But then, we can set some special number here (-1) instead. wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org