npawar commented on a change in pull request #7066:
URL: https://github.com/apache/pinot/pull/7066#discussion_r682955117



##########
File path: 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
##########
@@ -367,7 +371,7 @@ public void testSegmentSizeBasedUpdaterWithModifications() {
         mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, 
flushThresholdTimeMillis, flushAutotuneInitialRows);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, 
sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, 
newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());

Review comment:
       is it possible to add at least 1 test that exercises this 1) where it 
picks 0 2) it picks another partition id which is lowest

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1184,7 +1187,8 @@ private String setupNewPartitionGroup(TableConfig 
tableConfig, PartitionLevelStr
 
     CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(null, startOffset, 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
creationTimeMs,
-        committingSegmentDescriptor, null, instancePartitions, 
numPartitionGroups, numReplicas);
+        committingSegmentDescriptor, null, instancePartitions, 
numPartitionGroups, numReplicas,
+        Collections.singletonList(partitionGroupMetadata));

Review comment:
       this should pass the whole list right? if we send just the one, every 
partition will update threshold based on its own partition, and this will not 
be the same as using only 0th/lowest available partition.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
##########
@@ -102,8 +106,14 @@ public synchronized void 
updateFlushThreshold(PartitionLevelStreamConfig streamC
     // less same characteristics at any one point in time).
     // However, when we start a new table or change controller mastership, we 
can have any partition completing first.
     // It is best to learn the ratio as quickly as we can, so we allow any 
partition to supply the value.
-    // FIXME: The stream may not have partition "0"
-    if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 || 
_latestSegmentRowsToSizeRatio == 0) {
+
+    // Partition group id 0 might not be available always. We take the 
smallest available partition id in that case to update the threshold
+    int smallestAvailablePartitionGroupId =
+        
partitionGroupMetadataList.stream().min(Comparator.comparingInt(PartitionGroupMetadata::getPartitionGroupId))
+            .map(PartitionGroupMetadata::getPartitionGroupId).orElseGet(() -> 
0);

Review comment:
       seems like the `orElseGet(()->0)` is added only for the test purposes? 
In an actual setup, you would never reach that path? In that case, can we just 
send the right input in the test, instead of doing this?
   We still might have to guard against the case where all shards expired (can 
happen in kinesis). But then, we can set some special number here (-1) instead.
   wdyt?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to