itschrispeck commented on code in PR #13790:
URL: https://github.com/apache/pinot/pull/13790#discussion_r1849154551


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1294,7 +1315,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig 
tableConfig, StreamConfig st
                 selectStartOffset(offsetCriteria, partitionId, 
partitionIdToStartOffset,
                     partitionIdToSmallestOffset, tableConfig.getTableName(), 
offsetFactory,
                     latestSegmentZKMetadata.getEndOffset());
-            createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
+            createNewConsumingSegment(tableConfig, streamConfigs.get(0), 
latestSegmentZKMetadata, currentTimeMs,

Review Comment:
   Can we use the partitionId to choose the correct streamConfig? 
   
   Or we'd need to document that segment flush settings are only used from the 
first streamConfig in the table config's list (though I feel different flush 
settings per stream will eventually be a future requirement)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -929,15 +949,16 @@ public void ensureAllPartitionsConsuming(TableConfig 
tableConfig, StreamConfig s
         List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
             offsetsHaveToChange
                 ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
-                : getPartitionGroupConsumptionStatusList(idealState, 
streamConfig);
-        OffsetCriteria originalOffsetCriteria = 
streamConfig.getOffsetCriteria();
+                : getPartitionGroupConsumptionStatusList(idealState, 
streamConfigs);
+        // FIXME: Right now, we assume topics are sharing same offset criteria

Review Comment:
   Does it make sense to add a precondition to check this?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String> 
getStreamConfigMap(TableConfig tableConfig) {
     return streamConfigMap;
   }
 
+  /**
+   * Fetches the streamConfig from the given realtime table.
+   * First, the ingestionConfigs->stream->streamConfigs will be checked.
+   * If not found, the indexingConfig->streamConfigs will be checked (which is 
deprecated).
+   * @param tableConfig realtime table config
+   * @return streamConfigs List of maps
+   */
+  public static List<Map<String, String>> getStreamConfigMaps(TableConfig 
tableConfig) {

Review Comment:
   Can we remove the old method if it is no longer used?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -255,12 +255,12 @@ public List<PartitionGroupConsumptionStatus> 
getPartitionGroupConsumptionStatusL
 
     // Create a {@link PartitionGroupConsumptionStatus} for each latest segment
     StreamPartitionMsgOffsetFactory offsetFactory =
-        
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+        
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
     for (Map.Entry<Integer, LLCSegmentName> entry : 
partitionGroupIdToLatestSegment.entrySet()) {
       int partitionGroupId = entry.getKey();
       LLCSegmentName llcSegmentName = entry.getValue();
       SegmentZKMetadata segmentZKMetadata =
-          getSegmentZKMetadata(streamConfig.getTableNameWithType(), 
llcSegmentName.getSegmentName());
+          getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(), 
llcSegmentName.getSegmentName());

Review Comment:
   nit: `idealState.getId()` instead of `.get(0)`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java:
##########
@@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String 
realtimeTableName, ZkHelixPropertySt
     }
   }
 
+  public MissingConsumingSegmentFinder(String realtimeTableName, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
+      ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) {
+    _realtimeTableName = realtimeTableName;
+    _controllerMetrics = controllerMetrics;
+    _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, 
controllerMetrics);
+    _streamPartitionMsgOffsetFactory =
+        
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();

Review Comment:
   I think this breaks when mixing streams that do not use the same offset 
factory type, e.g. kinesis and kafka.  (there's a lot of this specific case for 
offset factory, won't mark all)
   
   We could UT, or shall we add a TODO for them since we can't easily test e2e 
internally? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java:
##########
@@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String 
realtimeTableName, ZkHelixPropertySt
     }
   }
 
+  public MissingConsumingSegmentFinder(String realtimeTableName, 
ZkHelixPropertyStore<ZNRecord> propertyStore,

Review Comment:
   The old constructor is no longer used, can we remove it and update the 
tests? 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -18,33 +18,44 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Fetches the list of {@link PartitionGroupMetadata} for all partition groups 
of the stream,
+ * Fetches the list of {@link PartitionGroupMetadata} for all partition groups 
of the streams,
  * using the {@link StreamMetadataProvider}
  */
 public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
 
   private List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
-  private final StreamConfig _streamConfig;
+  private final List<StreamConfig> _streamConfigs;
   private final List<PartitionGroupConsumptionStatus> 
_partitionGroupConsumptionStatusList;
-  private final StreamConsumerFactory _streamConsumerFactory;
   private Exception _exception;
-  private final String _topicName;
+  private final List<String> _topicNames;
+
+  public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
+    _topicNames = 
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList());
+    _streamConfigs = streamConfigs;
+    _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
+    _newPartitionGroupMetadataList = new ArrayList<>();
+  }
 
   public PartitionGroupMetadataFetcher(StreamConfig streamConfig,

Review Comment:
   Similar here, let's remove the unused constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to