Denovo1998 commented on code in PR #25386:
URL: https://github.com/apache/pulsar/pull/25386#discussion_r3110806226
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2153,6 +2156,38 @@ private void discardCorruptedMessage(MessageIdData
messageId, ClientCnx currentC
discardMessage(messageId, currentCnx, validationError, 1);
}
+ /**
+ * When batch index ack is enabled, ack the messages that failed to
deserialize by their index,
+ * while keeping successfully enqueued messages unacknowledged to avoid
message loss.
+ */
+ private void discardCorruptedBatchMessage(MessageIdData messageId,
ClientCnx currentCnx,
+ int batchSize, int skipped, int processed, ValidationError
validationError) {
+ log.error("[{}] [{}] Discarding corrupted batch messages with batch
index ack at {}:{}, "
+ + "batchSize={}, skipped={}, processed={},
exception={}",
+ subscription, consumerName, messageId.getLedgerId(),
messageId.getEntryId(),
+ batchSize, skipped, processed, validationError);
+ BitSetRecyclable ackBitSet = null;
+ int corruptedStartIndex = skipped + processed;
+ if (conf.isBatchIndexAckEnabled()) {
+ // When batch index ack is enabled, only ack the messages that
failed to deserialize.
+ // Messages that have been successfully enqueued remain
unacknowledged,
+ // waiting for the user to consume and acknowledge them normally.
+ ackBitSet = BitSetRecyclable.create();
+ ackBitSet.set(0, batchSize);
+ for (int i = corruptedStartIndex; i < batchSize; i++) {
+ ackBitSet.clear(i);
+ }
+ }
+ ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(),
messageId.getEntryId(),
+ ackBitSet, AckType.Individual, validationError,
Collections.emptyMap(), -1);
+ currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
+ if (ackBitSet != null) {
+ ackBitSet.recycle();
+ }
+ increaseAvailablePermits(currentCnx, batchSize - corruptedStartIndex);
Review Comment:
It is not advisable to directly use `batchSize - (skipped + processed)` to
replenish permits. The `receiveIndividualMessagesFromBatch()` method will
filter out already-acknowledged batch indices at the beginning based on the
inbound ackSet; these indices have not consumed broker permits. If this is a
partial batch redelivery, the current algorithm would include the undelivered
indices, potentially leading to permit over-credit.
It might be better to pass in the current inbound ackBitSet, count only the
bits after the failing index that are still deliverable, and construct a new
ack set based on that.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1815,13 +1816,15 @@ void
receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
continue;
}
executeNotifyCallback(message);
+ processedMessages++;
}
if (ackBitSet != null) {
ackBitSet.recycle();
}
- } catch (IllegalStateException e) {
- log.warn("[{}] [{}] unable to obtain message in batch",
subscription, consumerName, e);
- discardCorruptedMessage(messageId, cnx,
ValidationError.BatchDeSerializeError);
+ } catch (IllegalStateException | IllegalArgumentException e) {
Review Comment:
Commands.deSerializeSingleMessageInBatch() may also throw
IndexOutOfBoundsException for truncated payload (triggered by readUnsignedInt()
or retainedSlice()), whereas newSingleMessage() currently does not wrap it into
IllegalStateException. As a result, encountering a corrupted batch with
truncation or length mismatch can still bypass discardCorruptedBatchMessage,
and permit leak or stuck issues would persist.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]