ege-st commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1428220886
########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java: ########## @@ -46,6 +50,20 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i super(clientId, streamConfig, partition); } + @Override + public void validateStreamState(StreamPartitionMsgOffset startMsgOffset) throws PermanentConsumerException { + final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); + Map<TopicPartition, Long> beginningOffsets = + _consumer.beginningOffsets(Collections.singletonList(_topicPartition)); + + final long beginningOffset = beginningOffsets.getOrDefault(_topicPartition, 0L); + if (startOffset < beginningOffset) { Review Comment: Is `startOffset > beginningOffset` an acceptable scenario? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) { _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason); _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus); + try { + _partitionGroupConsumer.validateStreamState(_currentOffset); + _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0, Review Comment: So it's good that we have a gauge per table (because otherwise, a table which is healthy could reset the gauge from `1` to `0` if it executes this check _after_ an unhealthy table set the gauge to `1`). However, what happens if there are multiple partitions on a table and one partition's segment sets this gauge to `1`, then a different partition's segment executes this code and resets the gauge from `1` to `0`? Put another way: if a table has 3 partitions and 1 of them fails `validateStreamState` but the other two pass `validateStreamState` then it's very likely that the healthy partitions will mask the unhealthy partition. We need to make sure that if there is a segment that fails this validation this gauge is set to `1` until that segment is fixed or there is some other manual intervention. ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) { _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason); _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus); + try { + _partitionGroupConsumer.validateStreamState(_currentOffset); + _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0, + ServerGauge.INVALID_REALTIME_STREAM_STATE_EXCEPTION, 0); + } catch (PermanentConsumerException pce) { Review Comment: Should we catch this exception here or bubble it up to the next level? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org