This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 15bd915f fix(consumer): fix panic when messages size is 0 (#1460)
15bd915f is described below
commit 15bd915fcf0af108c533f17dd4fe50f81e1e5a15
Author: unJASON <[email protected]>
AuthorDate: Fri Mar 20 16:52:29 2026 +0800
fix(consumer): fix panic when messages size is 0 (#1460)
### Modifications
if messages in `MessageReceived` are all skipped, then don't add into
`queuech`.
avoid panic when `dispatcher()` received clear command from `clearQueueCh`
---------
Co-authored-by: crossoverJie <[email protected]>
---
pulsar/consumer_partition.go | 5 ++
pulsar/consumer_partition_test.go | 118 ++++++++++++++++++++++++++++++++++++++
2 files changed, 123 insertions(+)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ab35b7a1..918de2cf 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1437,6 +1437,11 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
pc.availablePermits.add(skippedMessages)
}
+ if len(messages) == 0 {
+ pc.log.Warnf("receive %d messages , all filtered", numMsgs)
+ return nil
+ }
+
// send messages to the dispatcher
pc.queueCh <- messages
return nil
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index 31877693..cfaf7bf6 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -20,9 +20,11 @@ package pulsar
import (
"sync"
"testing"
+ "time"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)
@@ -229,3 +231,119 @@ var rawBatchMessage10 = []byte{
0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
0x6f,
}
+
+// TestMessageReceivedAllMessagesDiscarded verifies that when all messages in
a batch
+// are discarded by messageShouldBeDiscarded (because startMessageID is
greater than
+// all message IDs), no empty slice is sent to queueCh and no panic occurs.
+// This is the regression test for
https://github.com/apache/pulsar-client-go/issues/1454
+func TestMessageReceivedAllMessagesDiscarded(t *testing.T) {
+ pc := partitionConsumer{
+ queueCh: make(chan []*message, 1),
+ eventsCh: make(chan interface{}, 1),
+ compressionProviders: sync.Map{},
+ maxQueueSize: 1000,
+ options: &partitionConsumerOpts{},
+ metrics: newTestMetrics(),
+ decryptor: crypto.NewNoopDecryptor(),
+ log: log.DefaultNopLogger(),
+ }
+ pc._setConn(dummyConnection{})
+ pc.availablePermits = &availablePermits{pc: &pc}
+ pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+ func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
+
+ // Set startMessageID to a value greater than the message IDs in the
batch.
+ // The raw messages use ledgerID=0, entryID=0 (from nil response), so
setting
+ // startMessageID with ledgerID=100 ensures all messages are discarded.
+ pc.startMessageID.set(newTrackingMessageID(100, 0, 0, 0, 0, nil))
+
+ // Use a batch of 10 messages; all should be discarded
+ headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
+ err := pc.MessageReceived(nil, headersAndPayload)
+ assert.Nil(t, err)
+
+ // queueCh should be empty since all messages were discarded
+ select {
+ case msgs := <-pc.queueCh:
+ t.Fatalf("expected no messages on queueCh, but got %d",
len(msgs))
+ default:
+ }
+}
+
+// TestMessageReceivedSingleMessageDiscarded verifies the same behavior for a
+// single (non-batch) message that gets discarded.
+func TestMessageReceivedSingleMessageDiscarded(t *testing.T) {
+ pc := partitionConsumer{
+ queueCh: make(chan []*message, 1),
+ eventsCh: make(chan interface{}, 1),
+ compressionProviders: sync.Map{},
+ maxQueueSize: 1000,
+ options: &partitionConsumerOpts{},
+ metrics: newTestMetrics(),
+ decryptor: crypto.NewNoopDecryptor(),
+ log: log.DefaultNopLogger(),
+ }
+ pc._setConn(dummyConnection{})
+ pc.availablePermits = &availablePermits{pc: &pc}
+ pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+ func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
+
+ pc.startMessageID.set(newTrackingMessageID(100, 0, 0, 0, 0, nil))
+
+ headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
+ err := pc.MessageReceived(nil, headersAndPayload)
+ assert.Nil(t, err)
+
+ select {
+ case msgs := <-pc.queueCh:
+ t.Fatalf("expected no messages on queueCh, but got %d",
len(msgs))
+ default:
+ }
+}
+
+// TestMessageReceivedAllMessagesDuplicate verifies that when all messages in
a batch
+// are detected as duplicates by ackGroupingTracker, no empty slice is sent to
queueCh.
+func TestMessageReceivedAllMessagesDuplicate(t *testing.T) {
+ pc := partitionConsumer{
+ queueCh: make(chan []*message, 1),
+ eventsCh: make(chan interface{}, 1),
+ compressionProviders: sync.Map{},
+ maxQueueSize: 1000,
+ options: &partitionConsumerOpts{},
+ metrics: newTestMetrics(),
+ decryptor: crypto.NewNoopDecryptor(),
+ log: log.DefaultNopLogger(),
+ }
+ pc._setConn(dummyConnection{})
+ pc.availablePermits = &availablePermits{pc: &pc}
+ // Use a timedAckGroupingTracker (MaxSize > 1) so that isDuplicate
tracks pending acks
+ pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{
+ MaxSize: 1000,
+ MaxTime: 1 * time.Hour,
+ }, func(_ MessageID) {}, nil, nil)
+
+ // First, receive the batch normally to populate the queue
+ headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
+ err := pc.MessageReceived(nil, headersAndPayload)
+ assert.Nil(t, err)
+
+ messages := <-pc.queueCh
+ assert.Equal(t, 10, len(messages))
+
+ // Ack all messages so they are recorded in the ack tracker as
pending/duplicate
+ for _, m := range messages {
+ pc.ackGroupingTracker.add(m.msgID)
+ }
+
+ // Send the same batch again; all messages should be detected as
duplicates
+ headersAndPayload = internal.NewBufferWrapper(rawBatchMessage10)
+ err = pc.MessageReceived(nil, headersAndPayload)
+ assert.Nil(t, err)
+
+ // queueCh should be empty since all messages were duplicates
+ select {
+ case msgs := <-pc.queueCh:
+ t.Fatalf("expected no messages on queueCh, but got %d",
len(msgs))
+ default:
+ }
+}