jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628240171
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,330 @@ private void unload() {
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+ failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
+
+ /**
+ * Frees the current batch.
+ */
+ private void freeCurrentBatch() {
+ // Cancel the linger timeout.
+ currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+ // Release the buffer.
+ bufferSupplier.release(currentBatch.buffer);
+
+ currentBatch = null;
+ }
+
+ /**
+ * Writes the current (or pending) batch to the log. When the batch is
written
+ * locally, a new snapshot is created in the snapshot registry and the
events
+ * associated with the batch are added to the deferred event queue.
+ */
+ private void writeCurrentBatch() {
+ if (currentBatch != null) {
+ try {
+ // Write the records to the log and update the last
written offset.
+ long offset = partitionWriter.append(
+ tp,
+ currentBatch.verificationGuard,
+ currentBatch.builder.build()
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ // Add all the pending deferred events to the deferred
event queue.
+ for (DeferredEvent event : currentBatch.events) {
+ deferredEventQueue.add(offset, event);
+ }
+
+ // Free up the current batch.
+ freeCurrentBatch();
+ } catch (Throwable t) {
+ failCurrentBatch(t);
+ }
+ }
+ }
+
+ /**
+ * Writes the current batch if it is transactional or if it has past
the append linger time.
+ */
+ private void maybeWriteCurrentBatch(long currentTimeMs) {
+ if (currentBatch != null) {
+ if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+ writeCurrentBatch();
+ }
+ }
+ }
+
+ /**
+ * Fails the current batch, reverts to the snapshot to the base/start
offset of the
+ * batch, fails all the associated events.
+ */
+ private void failCurrentBatch(Throwable t) {
+ if (currentBatch != null) {
+ coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+ for (DeferredEvent event : currentBatch.events) {
+ event.complete(t);
+ }
+ freeCurrentBatch();
+ }
+ }
+
+ /**
+ * Allocates a new batch if none already exists.
+ */
+ private void maybeAllocateNewBatch(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ long currentTimeMs
+ ) {
+ if (currentBatch == null) {
+ LogConfig logConfig = partitionWriter.config(tp);
+ byte magic = logConfig.recordVersion().value;
+ int maxBatchSize = logConfig.maxMessageSize();
+ long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+ buffer,
+ magic,
+ compression,
+ TimestampType.CREATE_TIME,
+ 0L,
+ currentTimeMs,
+ producerId,
+ producerEpoch,
+ 0,
+ producerId != RecordBatch.NO_PRODUCER_ID,
+ false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ maxBatchSize
+ );
+
+ Optional<TimerTask> lingerTimeoutTask = Optional.empty();
+ if (appendLingerMs > 0) {
+ lingerTimeoutTask = Optional.of(new
TimerTask(appendLingerMs) {
+ @Override
+ public void run() {
+ scheduleInternalOperation("FlushBatch", tp, () -> {
+ if (this.isCancelled()) return;
+ withActiveContextOrThrow(tp,
CoordinatorContext::writeCurrentBatch);
+ });
+ }
+ });
+ }
+
+ currentBatch = new CoordinatorBatch(
+ prevLastWrittenOffset,
+ currentTimeMs,
+ maxBatchSize,
+ verificationGuard,
+ buffer,
+ builder,
+ lingerTimeoutTask
+ );
+ }
+ }
+
+ /**
+ * Appends records to the log and replay them to the state machine.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param verificationGuard The verification guard.
+ * @param records The records to append.
+ * @param replay A boolean indicating whether the records
+ * must be replayed or not.
+ * @param event The event that must be completed when the
+ * records are written.
+ */
+ private void append(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ List<U> records,
+ boolean replay,
+ DeferredEvent event
+ ) {
+ if (state != CoordinatorState.ACTIVE) {
+ throw new IllegalStateException("Coordinator must be active to
append records");
+ }
+
+ if (records.isEmpty()) {
+ // If the records are empty, it was a read operation after
all. In this case,
+ // the response can be returned directly iff there are no
pending write operations;
+ // otherwise, the read needs to wait on the last write
operation to be completed.
+ if (currentBatch != null) {
Review Comment:
for my understanding, if we have a batch, the event is added to the batch
and will be added to the queue (or not if there is nothing in the queue) when
the batch is completed. But if there is no batch we just put it in the deferred
event queue?
So we will still maintain ordering of events, but we won't allocate a new
batch (we essentially complete it immediately)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]