dang-stripe opened a new issue, #12318:
URL: https://github.com/apache/pinot/issues/12318

   We have a Pinot cluster that consumes using isolation level READ_COMMITTED 
from an exactly-once Flink app upstream. This Flink app publishes using Kafka 
transactions. The app got into a state where it crashed multiple times leading 
to many aborted transactions.
   
   The Pinot cluster had many stuck partitions around the same time as the app 
crashlooping. From Pinot's perspective, the 
[`poll`](https://github.com/apache/pinot/blob/master/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java#L69)
 call from the consumer was returning no records since all the messages that 
were consumed were aborted.
   
   We'd see repeated log lines from Pinot for a stuck segment like this. This 
would repeat indefinitely.
   
   ```
   [2024-01-22 23:59:57.269842] INFO [KafkaConsumer] 
[table__1234__10346__20240122T1755Z:25] [Consumer clientId=consumer-1234, 
groupId=consumer-1234] Seeking to offset 16548330245 for partition table-1234
   [2024-01-22 23:59:57.169762] INFO 
[LLRealtimeSegmentDataManager_table__1234__10346__20240122T1755Z] 
[table__1234__10346__20240122T1755Z:25] Consumed 0 events from (rate:0.0/s), 
currentOffset=16548330245, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
   ```
   
   We confirmed that it was stuck on aborted messages by turning on DEBUG 
logging and seeing these messages from the Kafka consumer:
   
   ```
   DEBUG [Fetcher] [table__1234__10346__20240122T1755Z:25] [Consumer 
clientId=consumer-1234, groupId=consumer-1234] Skipping aborted record batch 
from partition topic-1234 with producerId 12191437 and offsets 9591674883 to 
9591674883
   ```
   
   To remediate, we increased `stream.kafka.fetch.timeout.millis` to a larger 
value.
   
   We're wondering if there's a way for Pinot to detect this case. This might 
require Kafka changes to support since it seems like the Kafka consumer does 
all of the filtering for aborted transactions in the consumer library here: 
https://github.com/apache/kafka/blob/65596d0a2368399c77423d6ca8fca0c15dd64840/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java#L205-L221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to