This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 15bd915f fix(consumer): fix panic when messages size is 0 (#1460)
15bd915f is described below

commit 15bd915fcf0af108c533f17dd4fe50f81e1e5a15
Author: unJASON <[email protected]>
AuthorDate: Fri Mar 20 16:52:29 2026 +0800

    fix(consumer): fix panic when messages size is 0 (#1460)
    
    ### Modifications
    
    if messages in `MessageReceived` are all skipped, then don't add into 
`queuech`.
    
    avoid panic when `dispatcher()` received clear command from `clearQueueCh`
    
    ---------
    
    Co-authored-by: crossoverJie <[email protected]>
---
 pulsar/consumer_partition.go      |   5 ++
 pulsar/consumer_partition_test.go | 118 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 123 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ab35b7a1..918de2cf 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1437,6 +1437,11 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                pc.availablePermits.add(skippedMessages)
        }
 
+       if len(messages) == 0 {
+               pc.log.Warnf("receive %d messages , all filtered", numMsgs)
+               return nil
+       }
+
        // send messages to the dispatcher
        pc.queueCh <- messages
        return nil
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index 31877693..cfaf7bf6 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -20,9 +20,11 @@ package pulsar
 import (
        "sync"
        "testing"
+       "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/stretchr/testify/assert"
 )
@@ -229,3 +231,119 @@ var rawBatchMessage10 = []byte{
        0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
        0x6f,
 }
+
+// TestMessageReceivedAllMessagesDiscarded verifies that when all messages in 
a batch
+// are discarded by messageShouldBeDiscarded (because startMessageID is 
greater than
+// all message IDs), no empty slice is sent to queueCh and no panic occurs.
+// This is the regression test for 
https://github.com/apache/pulsar-client-go/issues/1454
+func TestMessageReceivedAllMessagesDiscarded(t *testing.T) {
+       pc := partitionConsumer{
+               queueCh:              make(chan []*message, 1),
+               eventsCh:             make(chan interface{}, 1),
+               compressionProviders: sync.Map{},
+               maxQueueSize:         1000,
+               options:              &partitionConsumerOpts{},
+               metrics:              newTestMetrics(),
+               decryptor:            crypto.NewNoopDecryptor(),
+               log:                  log.DefaultNopLogger(),
+       }
+       pc._setConn(dummyConnection{})
+       pc.availablePermits = &availablePermits{pc: &pc}
+       pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+               func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
+
+       // Set startMessageID to a value greater than the message IDs in the 
batch.
+       // The raw messages use ledgerID=0, entryID=0 (from nil response), so 
setting
+       // startMessageID with ledgerID=100 ensures all messages are discarded.
+       pc.startMessageID.set(newTrackingMessageID(100, 0, 0, 0, 0, nil))
+
+       // Use a batch of 10 messages; all should be discarded
+       headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
+       err := pc.MessageReceived(nil, headersAndPayload)
+       assert.Nil(t, err)
+
+       // queueCh should be empty since all messages were discarded
+       select {
+       case msgs := <-pc.queueCh:
+               t.Fatalf("expected no messages on queueCh, but got %d", 
len(msgs))
+       default:
+       }
+}
+
+// TestMessageReceivedSingleMessageDiscarded verifies the same behavior for a
+// single (non-batch) message that gets discarded.
+func TestMessageReceivedSingleMessageDiscarded(t *testing.T) {
+       pc := partitionConsumer{
+               queueCh:              make(chan []*message, 1),
+               eventsCh:             make(chan interface{}, 1),
+               compressionProviders: sync.Map{},
+               maxQueueSize:         1000,
+               options:              &partitionConsumerOpts{},
+               metrics:              newTestMetrics(),
+               decryptor:            crypto.NewNoopDecryptor(),
+               log:                  log.DefaultNopLogger(),
+       }
+       pc._setConn(dummyConnection{})
+       pc.availablePermits = &availablePermits{pc: &pc}
+       pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+               func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
+
+       pc.startMessageID.set(newTrackingMessageID(100, 0, 0, 0, 0, nil))
+
+       headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
+       err := pc.MessageReceived(nil, headersAndPayload)
+       assert.Nil(t, err)
+
+       select {
+       case msgs := <-pc.queueCh:
+               t.Fatalf("expected no messages on queueCh, but got %d", 
len(msgs))
+       default:
+       }
+}
+
+// TestMessageReceivedAllMessagesDuplicate verifies that when all messages in 
a batch
+// are detected as duplicates by ackGroupingTracker, no empty slice is sent to 
queueCh.
+func TestMessageReceivedAllMessagesDuplicate(t *testing.T) {
+       pc := partitionConsumer{
+               queueCh:              make(chan []*message, 1),
+               eventsCh:             make(chan interface{}, 1),
+               compressionProviders: sync.Map{},
+               maxQueueSize:         1000,
+               options:              &partitionConsumerOpts{},
+               metrics:              newTestMetrics(),
+               decryptor:            crypto.NewNoopDecryptor(),
+               log:                  log.DefaultNopLogger(),
+       }
+       pc._setConn(dummyConnection{})
+       pc.availablePermits = &availablePermits{pc: &pc}
+       // Use a timedAckGroupingTracker (MaxSize > 1) so that isDuplicate 
tracks pending acks
+       pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{
+               MaxSize: 1000,
+               MaxTime: 1 * time.Hour,
+       }, func(_ MessageID) {}, nil, nil)
+
+       // First, receive the batch normally to populate the queue
+       headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
+       err := pc.MessageReceived(nil, headersAndPayload)
+       assert.Nil(t, err)
+
+       messages := <-pc.queueCh
+       assert.Equal(t, 10, len(messages))
+
+       // Ack all messages so they are recorded in the ack tracker as 
pending/duplicate
+       for _, m := range messages {
+               pc.ackGroupingTracker.add(m.msgID)
+       }
+
+       // Send the same batch again; all messages should be detected as 
duplicates
+       headersAndPayload = internal.NewBufferWrapper(rawBatchMessage10)
+       err = pc.MessageReceived(nil, headersAndPayload)
+       assert.Nil(t, err)
+
+       // queueCh should be empty since all messages were duplicates
+       select {
+       case msgs := <-pc.queueCh:
+               t.Fatalf("expected no messages on queueCh, but got %d", 
len(msgs))
+       default:
+       }
+}

Reply via email to