nicktelford commented on code in PR #20954:
URL: https://github.com/apache/kafka/pull/20954#discussion_r2781575737


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -309,6 +311,46 @@ private void closeStartupTasks(final Predicate<Task> 
predicate) {
         }
     }
 
+    public Map<TaskId, Long> taskOffsetSums(final Set<TaskId> tasks) {

Review Comment:
   > However, this will break the TaskManager when it tries to **put a 
Task.LATEST_OFFSET for the tasks that didn't have a cached offset**
   
   Not sure what you're getting at here. If tasks don't have an offset cached 
in `StateDirectory`, it's because no local state exists on-disk (or if there 
is, that local state is corrupt and will be wiped).
   
   `TaskManager` simply overwrites the cached offset for running active tasks 
with `LATEST_OFFSET`, because we know we have the very latest offset for these 
tasks. `StateDirectory` can't do this, because it doesn't know anything about 
the assignment, only the state on the local disk.
   
   A simple solution for the mutability would be to just have `TaskManager` 
construct a new map, copying the offsets from the 
`StateDirectory#taskOffsetSums` result and adding the `LATEST_OFFSET`s as 
necessary. But is it really worth it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to