shashankhs11 opened a new pull request, #20292:
URL: https://github.com/apache/kafka/pull/20292

   This PR continues the work that was mostly completed but was unreviewed in 
#13283. This is part of 
[KIP-770](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390)
 which is partially completed.
   
   I copied and pasted most of the work from the previous PR and added some 
changes and also fixed the existing tests that were failing. These are the 
changes added in addition to the previous work:
   
   ### 1. Refactored `TopologyConfig`
   The `checkStyle` was failing due to a high cyclomatic complexity in this 
file. I refactored some of the code to pass the checkStyle.
   
   ### 2. Added type check for `StreamTask` in `TaskManager`
   There was a new task called `ReadOnlyTask` 
(https://github.com/apache/kafka/pull/13283#issuecomment-1813728802) added 
during the previous PR, which was not accounted for and all the `Task` were 
assumed to be `StreamTask`. So I added a check for the type safety before 
casting. 
   
   ### 3. Added check for cache resizing
   
   > I would like to get more confirmation/guidance on what the expected 
behaviour should be for this
   
   Before:
   ```java
   resizeThreadCacheAndBufferMemory(threads.size());
   ```
   
   After:
   ```java
   // For cache resizing, check if thread removal succeeded or timed out
   final boolean threadWasRemoved = !threads.contains(streamThread);
   if (threadWasRemoved) {
       // Thread was successfully removed so we can use current thread count
       resizeThreadCacheAndBufferMemory(threads.size());
   } else {
       // Thread removal timed out so we need to resize as if the thread was 
removed
       resizeThreadCacheAndBufferMemory(threads.size() - 1);
   }
   ```
   
   
   ### 4. Check for `CorruptedRecord`
   
   ```java
   // Also we need to resume if the queue only contains corrupted records
   // because they represent a processable queue which is actually empty
   if (recordInfo.queue().isEmpty() ||
       (maxBufferedSize != -1 && recordInfo.queue().size() == maxBufferedSize) 
||
       recordInfo.queue().headRecordIsCorrupted()) { // added this check
   ```
   
   The check was added primarily due to a failing test 
   ```text
   StreamTaskTest.shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs()
   ```
   There were invalid timestamps in a partition which made the record a 
`CorruptedRecord`
   As per my understanding, we should resume the condition even if the head 
record is a `CorruptedRecord` because it is logically empty. The partition 
appears to be full but actually contains no meaningful records to process. 
   
   > Please confirm if my understanding is correct.
   
   ### 5. Refactored as per below comment
   As per - https://github.com/apache/kafka/pull/13283#issuecomment-1815619664
   > so how about we move that method to the Task interface and just return 0 
for the implementation of it in ReadOnlyTask and StandbyTask?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to