Denovo1998 commented on code in PR #25386:
URL: https://github.com/apache/pulsar/pull/25386#discussion_r3110806226


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2153,6 +2156,38 @@ private void discardCorruptedMessage(MessageIdData 
messageId, ClientCnx currentC
         discardMessage(messageId, currentCnx, validationError, 1);
     }
 
+    /**
+     * When batch index ack is enabled, ack the messages that failed to 
deserialize by their index,
+     * while keeping successfully enqueued messages unacknowledged to avoid 
message loss.
+     */
+    private void discardCorruptedBatchMessage(MessageIdData messageId, 
ClientCnx currentCnx,
+            int batchSize, int skipped, int processed, ValidationError 
validationError) {
+        log.error("[{}] [{}] Discarding corrupted batch messages with batch 
index ack at {}:{}, "
+                        + "batchSize={}, skipped={}, processed={}, 
exception={}",
+                subscription, consumerName, messageId.getLedgerId(), 
messageId.getEntryId(),
+                batchSize, skipped, processed, validationError);
+        BitSetRecyclable ackBitSet = null;
+        int corruptedStartIndex = skipped + processed;
+        if (conf.isBatchIndexAckEnabled()) {
+            // When batch index ack is enabled, only ack the messages that 
failed to deserialize.
+            // Messages that have been successfully enqueued remain 
unacknowledged,
+            // waiting for the user to consume and acknowledge them normally.
+            ackBitSet = BitSetRecyclable.create();
+            ackBitSet.set(0, batchSize);
+            for (int i = corruptedStartIndex; i < batchSize; i++) {
+                ackBitSet.clear(i);
+            }
+        }
+        ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), 
messageId.getEntryId(),
+                ackBitSet, AckType.Individual, validationError, 
Collections.emptyMap(), -1);
+        currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
+        if (ackBitSet != null) {
+            ackBitSet.recycle();
+        }
+        increaseAvailablePermits(currentCnx, batchSize - corruptedStartIndex);

Review Comment:
   It is not advisable to directly use `batchSize - (skipped + processed)` to 
replenish permits. The `receiveIndividualMessagesFromBatch()` method will 
filter out already-acknowledged batch indices at the beginning based on the 
inbound ackSet; these indices have not consumed broker permits. If this is a 
partial batch redelivery, the current algorithm would include the undelivered 
indices, potentially leading to permit over-credit.
   
   It might be better to pass in the current inbound ackBitSet, count only the 
bits after the failing index that are still deliverable, and construct a new 
ack set based on that.
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1815,13 +1816,15 @@ void 
receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
                     continue;
                 }
                 executeNotifyCallback(message);
+                processedMessages++;
             }
             if (ackBitSet != null) {
                 ackBitSet.recycle();
             }
-        } catch (IllegalStateException e) {
-            log.warn("[{}] [{}] unable to obtain message in batch", 
subscription, consumerName, e);
-            discardCorruptedMessage(messageId, cnx, 
ValidationError.BatchDeSerializeError);
+        } catch (IllegalStateException | IllegalArgumentException e) {

Review Comment:
   Commands.deSerializeSingleMessageInBatch() may also throw 
IndexOutOfBoundsException for truncated payload (triggered by readUnsignedInt() 
or retainedSlice()), whereas newSingleMessage() currently does not wrap it into 
IllegalStateException. As a result, encountering a corrupted batch with 
truncation or length mismatch can still bypass discardCorruptedBatchMessage, 
and permit leak or stuck issues would persist.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to