shashankhs11 opened a new pull request, #20292: URL: https://github.com/apache/kafka/pull/20292
This PR continues the work that was mostly completed but was unreviewed in #13283. This is part of [KIP-770](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390) which is partially completed. I copied and pasted most of the work from the previous PR and added some changes and also fixed the existing tests that were failing. These are the changes added in addition to the previous work: ### 1. Refactored `TopologyConfig` The `checkStyle` was failing due to a high cyclomatic complexity in this file. I refactored some of the code to pass the checkStyle. ### 2. Added type check for `StreamTask` in `TaskManager` There was a new task called `ReadOnlyTask` (https://github.com/apache/kafka/pull/13283#issuecomment-1813728802) added during the previous PR, which was not accounted for and all the `Task` were assumed to be `StreamTask`. So I added a check for the type safety before casting. ### 3. Added check for cache resizing > I would like to get more confirmation/guidance on what the expected behaviour should be for this Before: ```java resizeThreadCacheAndBufferMemory(threads.size()); ``` After: ```java // For cache resizing, check if thread removal succeeded or timed out final boolean threadWasRemoved = !threads.contains(streamThread); if (threadWasRemoved) { // Thread was successfully removed so we can use current thread count resizeThreadCacheAndBufferMemory(threads.size()); } else { // Thread removal timed out so we need to resize as if the thread was removed resizeThreadCacheAndBufferMemory(threads.size() - 1); } ``` ### 4. Check for `CorruptedRecord` ```java // Also we need to resume if the queue only contains corrupted records // because they represent a processable queue which is actually empty if (recordInfo.queue().isEmpty() || (maxBufferedSize != -1 && recordInfo.queue().size() == maxBufferedSize) || recordInfo.queue().headRecordIsCorrupted()) { // added this check ``` The check was added primarily due to a failing test ```text StreamTaskTest.shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs() ``` There were invalid timestamps in a partition which made the record a `CorruptedRecord` As per my understanding, we should resume the condition even if the head record is a `CorruptedRecord` because it is logically empty. The partition appears to be full but actually contains no meaningful records to process. > Please confirm if my understanding is correct. ### 5. Refactored as per below comment As per - https://github.com/apache/kafka/pull/13283#issuecomment-1815619664 > so how about we move that method to the Task interface and just return 0 for the implementation of it in ReadOnlyTask and StandbyTask? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
