jeffkbkim commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1630243662
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,339 @@ private void unload() {
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+ failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
+
+ /**
+ * Frees the current batch.
+ */
+ private void freeCurrentBatch() {
+ // Cancel the linger timeout.
+ currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+ // Release the buffer.
+ bufferSupplier.release(currentBatch.buffer);
+
+ currentBatch = null;
+ }
+
+ /**
+ * Flushes the current (or pending) batch to the log. When the batch
is written
+ * locally, a new snapshot is created in the snapshot registry and the
events
+ * associated with the batch are added to the deferred event queue.
+ */
+ private void flushCurrentBatch() {
+ if (currentBatch != null) {
+ try {
+ // Write the records to the log and update the last
written offset.
+ long offset = partitionWriter.append(
+ tp,
+ currentBatch.verificationGuard,
+ currentBatch.builder.build()
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ if (offset != currentBatch.nextOffset) {
+ throw new IllegalStateException("The state machine of
coordinator " + tp + " is out of sync with the " +
+ "underlying log. The last write returned " +
offset + " while " + currentBatch.nextOffset + " was expected");
+ }
+
+ // Add all the pending deferred events to the deferred
event queue.
+ for (DeferredEvent event : currentBatch.deferredEvents) {
+ deferredEventQueue.add(offset, event);
+ }
+
+ // Free up the current batch.
+ freeCurrentBatch();
+ } catch (Throwable t) {
+ log.error("Writing records to {} failed due to: {}.", tp,
t.getMessage());
+ failCurrentBatch(t);
+ // We rethrow the exception for the caller to handle it
too.
+ throw t;
+ }
+ }
+ }
+
+ /**
+ * Flushes the current batch if it is transactional or if it has
passed the append linger time.
+ */
+ private void maybeFlushCurrentBatch(long currentTimeMs) {
+ if (currentBatch != null) {
+ if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+ flushCurrentBatch();
+ }
+ }
+ }
+
+ /**
+ * Fails the current batch, reverts to the snapshot to the base/start
offset of the
+ * batch, fails all the associated events.
+ */
+ private void failCurrentBatch(Throwable t) {
+ if (currentBatch != null) {
+ coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+ for (DeferredEvent event : currentBatch.deferredEvents) {
+ event.complete(t);
+ }
+ freeCurrentBatch();
+ }
+ }
+
+ /**
+ * Allocates a new batch if none already exists.
+ */
+ private void maybeAllocateNewBatch(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ long currentTimeMs
+ ) {
+ if (currentBatch == null) {
+ LogConfig logConfig = partitionWriter.config(tp);
+ byte magic = logConfig.recordVersion().value;
+ int maxBatchSize = logConfig.maxMessageSize();
+ long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+ buffer,
+ magic,
+ compression,
+ TimestampType.CREATE_TIME,
+ 0L,
+ currentTimeMs,
+ producerId,
+ producerEpoch,
+ 0,
+ producerId != RecordBatch.NO_PRODUCER_ID,
+ false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ maxBatchSize
+ );
+
+ Optional<TimerTask> lingerTimeoutTask = Optional.empty();
+ if (appendLingerMs > 0) {
+ lingerTimeoutTask = Optional.of(new
TimerTask(appendLingerMs) {
+ @Override
+ public void run() {
+ scheduleInternalOperation("FlushBatch", tp, () -> {
+ if (this.isCancelled()) return;
+ withActiveContextOrThrow(tp,
CoordinatorContext::flushCurrentBatch);
+ });
+ }
+ });
+ CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get());
+ }
+
+ currentBatch = new CoordinatorBatch(
+ prevLastWrittenOffset,
+ currentTimeMs,
+ maxBatchSize,
+ verificationGuard,
+ buffer,
+ builder,
+ lingerTimeoutTask
+ );
+ }
+ }
+
+ /**
+ * Appends records to the log and replay them to the state machine.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param verificationGuard The verification guard.
+ * @param records The records to append.
+ * @param replay A boolean indicating whether the records
+ * must be replayed or not.
+ * @param event The event that must be completed when the
+ * records are written.
+ */
+ private void append(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ List<U> records,
+ boolean replay,
+ DeferredEvent event
+ ) {
+ if (state != CoordinatorState.ACTIVE) {
+ throw new IllegalStateException("Coordinator must be active to
append records");
+ }
+
+ if (records.isEmpty()) {
+ // If the records are empty, it was a read operation after
all. In this case,
+ // the response can be returned directly iff there are no
pending write operations;
+ // otherwise, the read needs to wait on the last write
operation to be completed.
+ if (currentBatch != null) {
+ currentBatch.deferredEvents.add(event);
+ } else {
+ OptionalLong pendingOffset =
deferredEventQueue.highestPendingOffset();
+ if (pendingOffset.isPresent()) {
+ deferredEventQueue.add(pendingOffset.getAsLong(),
event);
+ } else {
+ event.complete(null);
+ }
+ }
+ } else {
+ // If the records are not empty, first, they are applied to
the state machine,
+ // second, they are appended to the opened batch.
+ long currentTimeMs = time.milliseconds();
+
+ // If the current write operation is transactional, the
current batch
+ // is written before proceeding with it.
+ if (producerId != RecordBatch.NO_PRODUCER_ID) {
+ // If flushing fails, we don't catch the exception in
order to let
+ // the caller fail the current operation.
+ flushCurrentBatch();
+ }
+
+ // Allocate a new batch if none exists.
+ maybeAllocateNewBatch(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ currentTimeMs
+ );
+
+ // Prepare the records.
+ List<SimpleRecord> recordsToAppend = new
ArrayList<>(records.size());
+ for (U record : records) {
+ recordsToAppend.add(new SimpleRecord(
+ currentTimeMs,
+ serializer.serializeKey(record),
+ serializer.serializeValue(record)
+ ));
+ }
+
+ // Compute the estimated size of the records.
+ int estimatedSize = AbstractRecords.estimateSizeInBytes(
+ currentBatch.builder.magic(),
+ compression.type(),
+ recordsToAppend
+ );
+
+ // Check if the current batch has enough space. We check is
before
+ // replaying the records in order to avoid having to revert
back
+ // changes if the records do not fit within a batch.
+ if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
Review Comment:
looks much cleaner, thanks!
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -592,14 +674,139 @@ private void unload() {
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+ failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
/**
- * Appends records the the log and replay them to the state machine.
+ * Frees the current batch.
+ */
+ private void freeCurrentBatch() {
+ // Cancel the linger timeout.
+ currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+ // Release the buffer.
+ bufferSupplier.release(currentBatch.buffer);
+
+ currentBatch = null;
+ }
+
+ /**
+ * Writes the current (or pending) batch to the log. When the batch is
written
+ * locally, a new snapshot is created in the snapshot registry and the
events
+ * associated with the batch are added to the deferred event queue.
+ */
+ private void writeCurrentBatch() {
+ if (currentBatch != null) {
+ try {
+ // Write the records to the log and update the last
written offset.
+ long offset = partitionWriter.append(
+ tp,
+ currentBatch.verificationGuard,
+ currentBatch.builder.build()
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ // Add all the pending deferred events to the deferred
event queue.
+ for (DeferredEvent event : currentBatch.events) {
+ deferredEventQueue.add(offset, event);
+ }
+
+ // Free up the current batch.
+ freeCurrentBatch();
+ } catch (Throwable t) {
+ failCurrentBatch(t);
+ }
+ }
+ }
+
+ /**
+ * Writes the current batch if it is transactional or if it has past
the append linger time.
+ */
+ private void maybeWriteCurrentBatch(long currentTimeMs) {
+ if (currentBatch != null) {
+ if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+ writeCurrentBatch();
+ }
+ }
+ }
+
+ /**
+ * Fails the current batch, reverts to the snapshot to the base/start
offset of the
+ * batch, fails all the associated events.
+ */
+ private void failCurrentBatch(Throwable t) {
Review Comment:
> this is also try prior to this patch
what does this mean?
> We could also log something here directly
wondering if for write operations it makes more sense to log at the batch
level
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,339 @@ private void unload() {
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+ failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
+
+ /**
+ * Frees the current batch.
+ */
+ private void freeCurrentBatch() {
+ // Cancel the linger timeout.
+ currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+ // Release the buffer.
+ bufferSupplier.release(currentBatch.buffer);
+
+ currentBatch = null;
+ }
+
+ /**
+ * Flushes the current (or pending) batch to the log. When the batch
is written
+ * locally, a new snapshot is created in the snapshot registry and the
events
+ * associated with the batch are added to the deferred event queue.
+ */
+ private void flushCurrentBatch() {
+ if (currentBatch != null) {
+ try {
+ // Write the records to the log and update the last
written offset.
+ long offset = partitionWriter.append(
+ tp,
+ currentBatch.verificationGuard,
+ currentBatch.builder.build()
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ if (offset != currentBatch.nextOffset) {
+ throw new IllegalStateException("The state machine of
coordinator " + tp + " is out of sync with the " +
+ "underlying log. The last write returned " +
offset + " while " + currentBatch.nextOffset + " was expected");
Review Comment:
to confirm, we don't expect this in any case, right?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -1475,7 +1797,9 @@ public void onHighWatermarkUpdated(
* @param coordinatorMetrics The coordinator metrics.
* @param serializer The serializer.
* @param compression The compression codec.
+ * @param appendLingerMs The append linger time in ms.
*/
+ @SuppressWarnings("checkstyle:ParameterNumber")
Review Comment:
is this the new convention?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,330 @@ private void unload() {
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+ failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
+
+ /**
+ * Frees the current batch.
+ */
+ private void freeCurrentBatch() {
+ // Cancel the linger timeout.
+ currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+ // Release the buffer.
+ bufferSupplier.release(currentBatch.buffer);
+
+ currentBatch = null;
+ }
+
+ /**
+ * Writes the current (or pending) batch to the log. When the batch is
written
+ * locally, a new snapshot is created in the snapshot registry and the
events
+ * associated with the batch are added to the deferred event queue.
+ */
+ private void writeCurrentBatch() {
+ if (currentBatch != null) {
+ try {
+ // Write the records to the log and update the last
written offset.
+ long offset = partitionWriter.append(
+ tp,
+ currentBatch.verificationGuard,
+ currentBatch.builder.build()
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ // Add all the pending deferred events to the deferred
event queue.
+ for (DeferredEvent event : currentBatch.events) {
+ deferredEventQueue.add(offset, event);
+ }
+
+ // Free up the current batch.
+ freeCurrentBatch();
+ } catch (Throwable t) {
+ failCurrentBatch(t);
+ }
+ }
+ }
+
+ /**
+ * Writes the current batch if it is transactional or if it has past
the append linger time.
+ */
+ private void maybeWriteCurrentBatch(long currentTimeMs) {
+ if (currentBatch != null) {
+ if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+ writeCurrentBatch();
+ }
+ }
+ }
+
+ /**
+ * Fails the current batch, reverts to the snapshot to the base/start
offset of the
+ * batch, fails all the associated events.
+ */
+ private void failCurrentBatch(Throwable t) {
+ if (currentBatch != null) {
+ coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+ for (DeferredEvent event : currentBatch.events) {
+ event.complete(t);
+ }
+ freeCurrentBatch();
+ }
+ }
+
+ /**
+ * Allocates a new batch if none already exists.
+ */
+ private void maybeAllocateNewBatch(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ long currentTimeMs
+ ) {
+ if (currentBatch == null) {
+ LogConfig logConfig = partitionWriter.config(tp);
+ byte magic = logConfig.recordVersion().value;
+ int maxBatchSize = logConfig.maxMessageSize();
+ long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+ buffer,
+ magic,
+ compression,
+ TimestampType.CREATE_TIME,
+ 0L,
+ currentTimeMs,
+ producerId,
+ producerEpoch,
+ 0,
+ producerId != RecordBatch.NO_PRODUCER_ID,
+ false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ maxBatchSize
+ );
+
+ Optional<TimerTask> lingerTimeoutTask = Optional.empty();
+ if (appendLingerMs > 0) {
+ lingerTimeoutTask = Optional.of(new
TimerTask(appendLingerMs) {
+ @Override
+ public void run() {
+ scheduleInternalOperation("FlushBatch", tp, () -> {
+ if (this.isCancelled()) return;
+ withActiveContextOrThrow(tp,
CoordinatorContext::writeCurrentBatch);
+ });
+ }
+ });
+ }
+
+ currentBatch = new CoordinatorBatch(
+ prevLastWrittenOffset,
+ currentTimeMs,
+ maxBatchSize,
+ verificationGuard,
+ buffer,
+ builder,
+ lingerTimeoutTask
+ );
+ }
+ }
+
+ /**
+ * Appends records to the log and replay them to the state machine.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param verificationGuard The verification guard.
+ * @param records The records to append.
+ * @param replay A boolean indicating whether the records
+ * must be replayed or not.
+ * @param event The event that must be completed when the
+ * records are written.
+ */
+ private void append(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ List<U> records,
+ boolean replay,
+ DeferredEvent event
+ ) {
+ if (state != CoordinatorState.ACTIVE) {
+ throw new IllegalStateException("Coordinator must be active to
append records");
+ }
+
+ if (records.isEmpty()) {
+ // If the records are empty, it was a read operation after
all. In this case,
+ // the response can be returned directly iff there are no
pending write operations;
+ // otherwise, the read needs to wait on the last write
operation to be completed.
+ if (currentBatch != null) {
Review Comment:
> if we don't have a batch, we check whether the state machine has
uncommitted state and we put the write operation to the deferred queue if it
does
this would happen when we have flushed the previous batch but it has not
been replicated right?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,330 @@ private void unload() {
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+ failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
+
+ /**
+ * Frees the current batch.
+ */
+ private void freeCurrentBatch() {
+ // Cancel the linger timeout.
+ currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+ // Release the buffer.
+ bufferSupplier.release(currentBatch.buffer);
+
+ currentBatch = null;
+ }
+
+ /**
+ * Writes the current (or pending) batch to the log. When the batch is
written
+ * locally, a new snapshot is created in the snapshot registry and the
events
+ * associated with the batch are added to the deferred event queue.
+ */
+ private void writeCurrentBatch() {
+ if (currentBatch != null) {
+ try {
+ // Write the records to the log and update the last
written offset.
+ long offset = partitionWriter.append(
+ tp,
+ currentBatch.verificationGuard,
+ currentBatch.builder.build()
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ // Add all the pending deferred events to the deferred
event queue.
+ for (DeferredEvent event : currentBatch.events) {
+ deferredEventQueue.add(offset, event);
+ }
+
+ // Free up the current batch.
+ freeCurrentBatch();
+ } catch (Throwable t) {
+ failCurrentBatch(t);
+ }
+ }
+ }
+
+ /**
+ * Writes the current batch if it is transactional or if it has past
the append linger time.
+ */
+ private void maybeWriteCurrentBatch(long currentTimeMs) {
+ if (currentBatch != null) {
+ if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+ writeCurrentBatch();
+ }
+ }
+ }
+
+ /**
+ * Fails the current batch, reverts to the snapshot to the base/start
offset of the
+ * batch, fails all the associated events.
+ */
+ private void failCurrentBatch(Throwable t) {
+ if (currentBatch != null) {
+ coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+ for (DeferredEvent event : currentBatch.events) {
+ event.complete(t);
+ }
+ freeCurrentBatch();
+ }
+ }
+
+ /**
+ * Allocates a new batch if none already exists.
+ */
+ private void maybeAllocateNewBatch(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ long currentTimeMs
+ ) {
+ if (currentBatch == null) {
+ LogConfig logConfig = partitionWriter.config(tp);
+ byte magic = logConfig.recordVersion().value;
+ int maxBatchSize = logConfig.maxMessageSize();
+ long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+ buffer,
+ magic,
+ compression,
+ TimestampType.CREATE_TIME,
+ 0L,
+ currentTimeMs,
+ producerId,
+ producerEpoch,
+ 0,
+ producerId != RecordBatch.NO_PRODUCER_ID,
+ false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ maxBatchSize
+ );
+
+ Optional<TimerTask> lingerTimeoutTask = Optional.empty();
+ if (appendLingerMs > 0) {
+ lingerTimeoutTask = Optional.of(new
TimerTask(appendLingerMs) {
+ @Override
+ public void run() {
+ scheduleInternalOperation("FlushBatch", tp, () -> {
+ if (this.isCancelled()) return;
+ withActiveContextOrThrow(tp,
CoordinatorContext::writeCurrentBatch);
+ });
+ }
+ });
+ }
+
+ currentBatch = new CoordinatorBatch(
+ prevLastWrittenOffset,
+ currentTimeMs,
+ maxBatchSize,
+ verificationGuard,
+ buffer,
+ builder,
+ lingerTimeoutTask
+ );
+ }
+ }
+
+ /**
+ * Appends records to the log and replay them to the state machine.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param verificationGuard The verification guard.
+ * @param records The records to append.
+ * @param replay A boolean indicating whether the records
+ * must be replayed or not.
+ * @param event The event that must be completed when the
+ * records are written.
+ */
+ private void append(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ List<U> records,
+ boolean replay,
+ DeferredEvent event
+ ) {
+ if (state != CoordinatorState.ACTIVE) {
+ throw new IllegalStateException("Coordinator must be active to
append records");
+ }
+
+ if (records.isEmpty()) {
+ // If the records are empty, it was a read operation after
all. In this case,
+ // the response can be returned directly iff there are no
pending write operations;
+ // otherwise, the read needs to wait on the last write
operation to be completed.
+ if (currentBatch != null) {
+ currentBatch.events.add(event);
+ } else {
+ OptionalLong pendingOffset =
deferredEventQueue.highestPendingOffset();
+ if (pendingOffset.isPresent()) {
+ deferredEventQueue.add(pendingOffset.getAsLong(),
event);
+ } else {
+ event.complete(null);
+ }
+ }
+ } else {
+ // If the records are not empty, first, they are applied to
the state machine,
+ // second, then are appended to the opened batch.
+ long currentTimeMs = time.milliseconds();
+
+ // If the current write operation is transactional, the
current batch
+ // is written before proceeding with it.
+ if (producerId != RecordBatch.NO_PRODUCER_ID) {
+ writeCurrentBatch();
+ }
+
+ // Allocate a new batch if none exists.
+ maybeAllocateNewBatch(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ currentTimeMs
+ );
+
+ // Prepare the records.
+ List<SimpleRecord> recordsToAppend = new
ArrayList<>(records.size());
+ for (U record : records) {
+ recordsToAppend.add(new SimpleRecord(
+ currentTimeMs,
+ serializer.serializeKey(record),
+ serializer.serializeValue(record)
+ ));
+ }
+
+ // Compute the estimated size of the records.
+ int estimatedSize = AbstractRecords.estimateSizeInBytes(
+ currentBatch.builder.magic(),
+ compression.type(),
+ recordsToAppend
+ );
+
+ // Check if the current batch has enough space. We check is
before
+ // replaying the records in order to avoid having to revert
back
+ // changes if the records do not fit within a batch.
+ if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+ if (currentBatch.builder.numRecords() == 0) {
+ // If the number of records in the current batch is
zero, it means that
+ // the records are larger than the max batch size.
+ throw new RecordTooLargeException("Message batch size
is " + estimatedSize +
+ " bytes in append to partition " + tp + " which
exceeds the maximum " +
+ "configured size of " + currentBatch.maxBatchSize
+ ".");
+ } else {
+ // Otherwise, we write the current batch, allocate a
new one and re-verify
+ // whether the records fit in it.
+ writeCurrentBatch();
+ maybeAllocateNewBatch(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ currentTimeMs
+ );
+ if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+ throw new RecordTooLargeException("Message batch
size is " + estimatedSize +
+ " bytes in append to partition " + tp + "
which exceeds the maximum " +
+ "configured size of " +
currentBatch.maxBatchSize + ".");
+ }
+ }
+ }
+
+ // Add the event to the list of pending events associated with
the batch.
+ currentBatch.events.add(event);
+
+ try {
+ // Apply record to the state machine.
+ if (replay) {
+ for (int i = 0; i < records.size(); i++) {
+ // We compute the offset of the record based on
the last written offset. The
+ // coordinator is the single writer to the
underlying partition so we can
+ // deduce it like this.
+ coordinator.replay(
+ currentBatch.nextOffset + i,
Review Comment:
i see, so baseOffset is the LEO before the batch was written and nextOffset
is LEO. log end offset makes the most sense to me
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -626,89 +833,113 @@ private void append(
// If the records are empty, it was a read operation after
all. In this case,
// the response can be returned directly iff there are no
pending write operations;
// otherwise, the read needs to wait on the last write
operation to be completed.
- OptionalLong pendingOffset =
deferredEventQueue.highestPendingOffset();
- if (pendingOffset.isPresent()) {
- deferredEventQueue.add(pendingOffset.getAsLong(), event);
+ if (currentBatch != null) {
+ currentBatch.events.add(event);
} else {
- event.complete(null);
+ OptionalLong pendingOffset =
deferredEventQueue.highestPendingOffset();
+ if (pendingOffset.isPresent()) {
+ deferredEventQueue.add(pendingOffset.getAsLong(),
event);
+ } else {
+ event.complete(null);
+ }
}
} else {
// If the records are not empty, first, they are applied to
the state machine,
- // second, then are written to the partition/log, and finally,
the response
- // is put into the deferred event queue.
- long prevLastWrittenOffset = coordinator.lastWrittenOffset();
- LogConfig logConfig = partitionWriter.config(tp);
- byte magic = logConfig.recordVersion().value;
- int maxBatchSize = logConfig.maxMessageSize();
+ // second, then are appended to the opened batch.
long currentTimeMs = time.milliseconds();
- ByteBuffer buffer =
bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize));
- try {
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
- buffer,
- magic,
- compression,
- TimestampType.CREATE_TIME,
- 0L,
+ // If the current write operation is transactional, the
current batch
+ // is written before proceeding with it.
Review Comment:
aha! got it. thanks for the explanation
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,330 @@ private void unload() {
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+ failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
+
+ /**
+ * Frees the current batch.
+ */
+ private void freeCurrentBatch() {
+ // Cancel the linger timeout.
+ currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+ // Release the buffer.
+ bufferSupplier.release(currentBatch.buffer);
+
+ currentBatch = null;
+ }
+
+ /**
+ * Writes the current (or pending) batch to the log. When the batch is
written
+ * locally, a new snapshot is created in the snapshot registry and the
events
+ * associated with the batch are added to the deferred event queue.
+ */
+ private void writeCurrentBatch() {
+ if (currentBatch != null) {
+ try {
+ // Write the records to the log and update the last
written offset.
+ long offset = partitionWriter.append(
+ tp,
+ currentBatch.verificationGuard,
+ currentBatch.builder.build()
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ // Add all the pending deferred events to the deferred
event queue.
+ for (DeferredEvent event : currentBatch.events) {
+ deferredEventQueue.add(offset, event);
+ }
+
+ // Free up the current batch.
+ freeCurrentBatch();
+ } catch (Throwable t) {
+ failCurrentBatch(t);
+ }
+ }
+ }
+
+ /**
+ * Writes the current batch if it is transactional or if it has past
the append linger time.
+ */
+ private void maybeWriteCurrentBatch(long currentTimeMs) {
+ if (currentBatch != null) {
+ if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+ writeCurrentBatch();
+ }
+ }
+ }
+
+ /**
+ * Fails the current batch, reverts to the snapshot to the base/start
offset of the
+ * batch, fails all the associated events.
+ */
+ private void failCurrentBatch(Throwable t) {
+ if (currentBatch != null) {
+ coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+ for (DeferredEvent event : currentBatch.events) {
+ event.complete(t);
+ }
+ freeCurrentBatch();
+ }
+ }
+
+ /**
+ * Allocates a new batch if none already exists.
+ */
+ private void maybeAllocateNewBatch(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ long currentTimeMs
+ ) {
+ if (currentBatch == null) {
+ LogConfig logConfig = partitionWriter.config(tp);
+ byte magic = logConfig.recordVersion().value;
+ int maxBatchSize = logConfig.maxMessageSize();
+ long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
Review Comment:
that makes sense to me
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]