jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633532863
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,339 @@ private void unload() {
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+ failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
+
+ /**
+ * Frees the current batch.
+ */
+ private void freeCurrentBatch() {
+ // Cancel the linger timeout.
+ currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+ // Release the buffer.
+ bufferSupplier.release(currentBatch.buffer);
+
+ currentBatch = null;
+ }
+
+ /**
+ * Flushes the current (or pending) batch to the log. When the batch
is written
+ * locally, a new snapshot is created in the snapshot registry and the
events
+ * associated with the batch are added to the deferred event queue.
+ */
+ private void flushCurrentBatch() {
+ if (currentBatch != null) {
+ try {
+ // Write the records to the log and update the last
written offset.
+ long offset = partitionWriter.append(
+ tp,
+ currentBatch.verificationGuard,
+ currentBatch.builder.build()
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ if (offset != currentBatch.nextOffset) {
+ throw new IllegalStateException("The state machine of
coordinator " + tp + " is out of sync with the " +
+ "underlying log. The last write returned " +
offset + " while " + currentBatch.nextOffset + " was expected");
Review Comment:
This could be a better approach. I was not aware there were mechanisms to
validate before writing. 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]