dajac commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662545695
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -936,62 +941,90 @@ private void append(
));
}
- // Compute the estimated size of the records.
- int estimatedSize = AbstractRecords.estimateSizeInBytes(
- currentBatch.builder.magic(),
- compression.type(),
- recordsToAppend
- );
+ if (isAtomic) {
+ // Compute the estimated size of the records.
+ int estimatedSize = AbstractRecords.estimateSizeInBytes(
+ currentBatch.builder.magic(),
+ compression.type(),
+ recordsToAppend
+ );
- // Check if the current batch has enough space. We check is
before
- // replaying the records in order to avoid having to revert
back
- // changes if the records do not fit within a batch.
- if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
- throw new RecordTooLargeException("Message batch size is "
+ estimatedSize +
- " bytes in append to partition " + tp + " which
exceeds the maximum " +
- "configured size of " + currentBatch.maxBatchSize +
".");
- }
+ // Check if the current batch has enough space. We check
is before
+ // replaying the records in order to avoid having to
revert back
+ // changes if the records do not fit within a batch.
+ if (estimatedSize >
currentBatch.builder.maxAllowedBytes()) {
+ throw new RecordTooLargeException("Message batch size
is " + estimatedSize +
+ " bytes in append to partition " + tp + " which
exceeds the maximum " +
+ "configured size of " + currentBatch.maxBatchSize
+ ".");
+ }
- if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
- // Otherwise, we write the current batch, allocate a new
one and re-verify
- // whether the records fit in it.
- // If flushing fails, we don't catch the exception in
order to let
- // the caller fail the current operation.
- flushCurrentBatch();
- maybeAllocateNewBatch(
- producerId,
- producerEpoch,
- verificationGuard,
- currentTimeMs
- );
+ if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+ // Otherwise, we write the current batch, allocate a
new one and re-verify
+ // whether the records fit in it.
+ // If flushing fails, we don't catch the exception in
order to let
+ // the caller fail the current operation.
+ flushCurrentBatch();
+ maybeAllocateNewBatch(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ currentTimeMs
+ );
+ }
}
- // Add the event to the list of pending events associated with
the batch.
- currentBatch.deferredEvents.add(event);
-
try {
- // Apply record to the state machine.
- if (replay) {
- for (int i = 0; i < records.size(); i++) {
- // We compute the offset of the record based on
the last written offset. The
- // coordinator is the single writer to the
underlying partition so we can
- // deduce it like this.
+ for (int i = 0; i < records.size(); i++) {
+ U recordToReplay = records.get(i);
+ SimpleRecord recordToAppend = recordsToAppend.get(i);
+
+ if (replay) {
coordinator.replay(
- currentBatch.nextOffset + i,
+ currentBatch.nextOffset,
producerId,
producerEpoch,
- records.get(i)
+ recordToReplay
);
}
- }
- // Append to the batch.
- for (SimpleRecord record : recordsToAppend) {
- currentBatch.builder.append(record);
+ if (!isAtomic) {
+ boolean hasRoomFor =
currentBatch.builder.hasRoomFor(
+ recordToAppend.timestamp(),
+ recordToAppend.key(),
+ recordToAppend.value(),
+ recordToAppend.headers()
+ );
+
+ if (!hasRoomFor) {
+ if (currentBatch.builder.numRecords() == 0) {
+ throw new RecordTooLargeException("Record
" + recordToAppend + " in append to partition " + tp +
+ " exceeds exceeds the maximum
configured size of " + currentBatch.maxBatchSize + ".");
+ }
+
+ // If the current batch is not empty, we flush
it and allocate a new batch.
+ flushCurrentBatch();
+ maybeAllocateNewBatch(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ currentTimeMs
+ );
+ }
+ }
+
+ currentBatch.builder.append(recordToAppend);
currentBatch.nextOffset++;
}
+
+ // Add the event to the list of pending events associated
with the batch.
+ currentBatch.deferredEvents.add(event);
} catch (Throwable t) {
log.error("Replaying records to {} failed due to: {}.",
tp, t.getMessage());
+
+ // Add the event to the list of pending events associated
with the last
+ // batch in order to fail it too.
+ currentBatch.deferredEvents.add(event);
Review Comment:
Partially. The main reason was that the event must be attached to the last
batch (vs the first one) in order to only complete the write when all records
are committed.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -936,62 +941,90 @@ private void append(
));
}
- // Compute the estimated size of the records.
- int estimatedSize = AbstractRecords.estimateSizeInBytes(
- currentBatch.builder.magic(),
- compression.type(),
- recordsToAppend
- );
+ if (isAtomic) {
+ // Compute the estimated size of the records.
+ int estimatedSize = AbstractRecords.estimateSizeInBytes(
+ currentBatch.builder.magic(),
+ compression.type(),
+ recordsToAppend
+ );
- // Check if the current batch has enough space. We check is
before
- // replaying the records in order to avoid having to revert
back
- // changes if the records do not fit within a batch.
- if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
- throw new RecordTooLargeException("Message batch size is "
+ estimatedSize +
- " bytes in append to partition " + tp + " which
exceeds the maximum " +
- "configured size of " + currentBatch.maxBatchSize +
".");
- }
+ // Check if the current batch has enough space. We check
is before
+ // replaying the records in order to avoid having to
revert back
+ // changes if the records do not fit within a batch.
+ if (estimatedSize >
currentBatch.builder.maxAllowedBytes()) {
+ throw new RecordTooLargeException("Message batch size
is " + estimatedSize +
+ " bytes in append to partition " + tp + " which
exceeds the maximum " +
+ "configured size of " + currentBatch.maxBatchSize
+ ".");
+ }
- if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
- // Otherwise, we write the current batch, allocate a new
one and re-verify
- // whether the records fit in it.
- // If flushing fails, we don't catch the exception in
order to let
- // the caller fail the current operation.
- flushCurrentBatch();
- maybeAllocateNewBatch(
- producerId,
- producerEpoch,
- verificationGuard,
- currentTimeMs
- );
+ if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+ // Otherwise, we write the current batch, allocate a
new one and re-verify
+ // whether the records fit in it.
+ // If flushing fails, we don't catch the exception in
order to let
+ // the caller fail the current operation.
+ flushCurrentBatch();
+ maybeAllocateNewBatch(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ currentTimeMs
+ );
+ }
}
- // Add the event to the list of pending events associated with
the batch.
- currentBatch.deferredEvents.add(event);
-
try {
- // Apply record to the state machine.
- if (replay) {
- for (int i = 0; i < records.size(); i++) {
- // We compute the offset of the record based on
the last written offset. The
- // coordinator is the single writer to the
underlying partition so we can
- // deduce it like this.
+ for (int i = 0; i < records.size(); i++) {
+ U recordToReplay = records.get(i);
+ SimpleRecord recordToAppend = recordsToAppend.get(i);
+
+ if (replay) {
coordinator.replay(
- currentBatch.nextOffset + i,
+ currentBatch.nextOffset,
producerId,
producerEpoch,
- records.get(i)
+ recordToReplay
);
}
- }
- // Append to the batch.
- for (SimpleRecord record : recordsToAppend) {
- currentBatch.builder.append(record);
+ if (!isAtomic) {
+ boolean hasRoomFor =
currentBatch.builder.hasRoomFor(
+ recordToAppend.timestamp(),
+ recordToAppend.key(),
+ recordToAppend.value(),
+ recordToAppend.headers()
+ );
+
+ if (!hasRoomFor) {
+ if (currentBatch.builder.numRecords() == 0) {
+ throw new RecordTooLargeException("Record
" + recordToAppend + " in append to partition " + tp +
+ " exceeds exceeds the maximum
configured size of " + currentBatch.maxBatchSize + ".");
+ }
Review Comment:
Oh, I forgot that `hasRoomFor` always accept one record. In this case, it
will when the batch is written. I updated the code to reflect this. This is
also covered by the test that I added.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -936,62 +941,90 @@ private void append(
));
}
- // Compute the estimated size of the records.
- int estimatedSize = AbstractRecords.estimateSizeInBytes(
- currentBatch.builder.magic(),
- compression.type(),
- recordsToAppend
- );
+ if (isAtomic) {
+ // Compute the estimated size of the records.
+ int estimatedSize = AbstractRecords.estimateSizeInBytes(
+ currentBatch.builder.magic(),
+ compression.type(),
+ recordsToAppend
+ );
- // Check if the current batch has enough space. We check is
before
- // replaying the records in order to avoid having to revert
back
- // changes if the records do not fit within a batch.
- if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
- throw new RecordTooLargeException("Message batch size is "
+ estimatedSize +
- " bytes in append to partition " + tp + " which
exceeds the maximum " +
- "configured size of " + currentBatch.maxBatchSize +
".");
- }
+ // Check if the current batch has enough space. We check
is before
+ // replaying the records in order to avoid having to
revert back
+ // changes if the records do not fit within a batch.
+ if (estimatedSize >
currentBatch.builder.maxAllowedBytes()) {
+ throw new RecordTooLargeException("Message batch size
is " + estimatedSize +
+ " bytes in append to partition " + tp + " which
exceeds the maximum " +
+ "configured size of " + currentBatch.maxBatchSize
+ ".");
+ }
- if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
- // Otherwise, we write the current batch, allocate a new
one and re-verify
- // whether the records fit in it.
- // If flushing fails, we don't catch the exception in
order to let
- // the caller fail the current operation.
- flushCurrentBatch();
- maybeAllocateNewBatch(
- producerId,
- producerEpoch,
- verificationGuard,
- currentTimeMs
- );
+ if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+ // Otherwise, we write the current batch, allocate a
new one and re-verify
+ // whether the records fit in it.
+ // If flushing fails, we don't catch the exception in
order to let
+ // the caller fail the current operation.
+ flushCurrentBatch();
+ maybeAllocateNewBatch(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ currentTimeMs
+ );
+ }
}
- // Add the event to the list of pending events associated with
the batch.
- currentBatch.deferredEvents.add(event);
-
try {
- // Apply record to the state machine.
- if (replay) {
- for (int i = 0; i < records.size(); i++) {
- // We compute the offset of the record based on
the last written offset. The
- // coordinator is the single writer to the
underlying partition so we can
- // deduce it like this.
+ for (int i = 0; i < records.size(); i++) {
+ U recordToReplay = records.get(i);
+ SimpleRecord recordToAppend = recordsToAppend.get(i);
+
+ if (replay) {
coordinator.replay(
- currentBatch.nextOffset + i,
+ currentBatch.nextOffset,
producerId,
producerEpoch,
- records.get(i)
+ recordToReplay
);
}
- }
- // Append to the batch.
- for (SimpleRecord record : recordsToAppend) {
- currentBatch.builder.append(record);
+ if (!isAtomic) {
+ boolean hasRoomFor =
currentBatch.builder.hasRoomFor(
+ recordToAppend.timestamp(),
+ recordToAppend.key(),
+ recordToAppend.value(),
+ recordToAppend.headers()
+ );
+
+ if (!hasRoomFor) {
+ if (currentBatch.builder.numRecords() == 0) {
+ throw new RecordTooLargeException("Record
" + recordToAppend + " in append to partition " + tp +
+ " exceeds exceeds the maximum
configured size of " + currentBatch.maxBatchSize + ".");
+ }
Review Comment:
> I think I'm missing though what happens in the case we did have some
records already in the builder but the record is too big for the batch.
The new test does exactly this.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -912,7 +915,9 @@ private void append(
// If the current write operation is transactional, the
current batch
// is written before proceeding with it.
- if (producerId != RecordBatch.NO_PRODUCER_ID) {
+ boolean isTransactional = producerId !=
RecordBatch.NO_PRODUCER_ID;
Review Comment:
Nope. Reverted.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3839,6 +3839,114 @@ public void close() {}
assertEquals("response1", write.get(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testScheduleWriteOperationWithNonAtomicRecords() throws
ExecutionException, InterruptedException, TimeoutException {
Review Comment:
I think that the two new tests cover the non-atomic write. All the existing
one must be kept as they are.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]