ege-st commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1428220886


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -46,6 +50,20 @@ public KafkaPartitionLevelConsumer(String clientId, 
StreamConfig streamConfig, i
     super(clientId, streamConfig, partition);
   }
 
+  @Override
+  public void validateStreamState(StreamPartitionMsgOffset startMsgOffset) 
throws PermanentConsumerException {
+    final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
+    Map<TopicPartition, Long> beginningOffsets =
+            
_consumer.beginningOffsets(Collections.singletonList(_topicPartition));
+
+    final long beginningOffset = 
beginningOffsets.getOrDefault(_topicPartition, 0L);
+    if (startOffset < beginningOffset) {

Review Comment:
   Is `startOffset > beginningOffset` an acceptable scenario?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
     _segmentLogger.info("Creating new stream consumer for topic partition {} , 
reason: {}", _clientId, reason);
     _partitionGroupConsumer =
         _streamConsumerFactory.createPartitionGroupConsumer(_clientId, 
_partitionGroupConsumptionStatus);
+    try {
+      _partitionGroupConsumer.validateStreamState(_currentOffset);
+      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,

Review Comment:
   So it's good that we have a gauge per table (because otherwise, a table 
which is healthy could reset the gauge from `1` to `0` if it executes this 
check _after_ an unhealthy table set the gauge to `1`).
   
   However, what happens if there are multiple partitions on a table and one 
partition's segment sets this gauge to `1`, then a different partition's 
segment executes this code and resets the gauge from `1` to `0`? Put another 
way: if a table has 3 partitions and 1 of them fails `validateStreamState` but 
the other two pass `validateStreamState` then it's very likely that the healthy 
partitions will mask the unhealthy partition.
   
   We need to make sure that if there is a segment that fails this validation 
this gauge is set to `1` until that segment is fixed or there is some other 
manual intervention.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
     _segmentLogger.info("Creating new stream consumer for topic partition {} , 
reason: {}", _clientId, reason);
     _partitionGroupConsumer =
         _streamConsumerFactory.createPartitionGroupConsumer(_clientId, 
_partitionGroupConsumptionStatus);
+    try {
+      _partitionGroupConsumer.validateStreamState(_currentOffset);
+      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,
+              ServerGauge.INVALID_REALTIME_STREAM_STATE_EXCEPTION, 0);
+    } catch (PermanentConsumerException pce) {

Review Comment:
   Should we catch this exception here or bubble it up to the next level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to