jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633660092
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -711,8 +752,16 @@ private void flushCurrentBatch() {
coordinator.updateLastWrittenOffset(offset);
if (offset != currentBatch.nextOffset) {
- throw new IllegalStateException("The state machine of
coordinator " + tp + " is out of sync with the " +
- "underlying log. The last write returned " +
offset + " while " + currentBatch.nextOffset + " was expected");
+ log.error("The state machine of the coordinator {} is
out of sync with the underlying log. " +
+ "The last written offset returned is {} while the
coordinator expected {}. The coordinator " +
+ "will be reloaded in order to re-synchronize the
state machine.",
+ tp, offset, currentBatch.nextOffset);
+ // Transition to FAILED state to unload the state
machine and complete
+ // exceptionally all the pending operations.
+ transitionTo(CoordinatorState.FAILED);
+ // Transition to LOADING to trigger the restoration of
the state.
+ transitionTo(CoordinatorState.LOADING);
Review Comment:
Is it worth adding a test for this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]