This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 593fccfb [Issue 1446][Consumer] Fix consumer can't consume resent
chunked messages (#1464)
593fccfb is described below
commit 593fccfbd82b80afed335727963b6f5730581491
Author: zhou zhuohan <[email protected]>
AuthorDate: Wed Mar 25 16:16:30 2026 +0800
[Issue 1446][Consumer] Fix consumer can't consume resent chunked messages
(#1464)
Master Issue: https://github.com/apache/pulsar-client-go/issues/1446
related issue https://github.com/apache/pulsar/pull/21070 and
https://github.com/apache/pulsar/pull/21101
### Motivation
Current, when the producer resend the chunked message like this:
```
M1: UUID: 0, ChunkID: 0
M2: UUID: 0, ChunkID: 0 // Resend the first chunk
M3: UUID: 0, ChunkID: 1
```
When the consumer received the M2, it will find that it's already tracking
the UUID:0 chunked messages, and will then discard the message M1 and M2. This
will lead to unable to consume the whole chunked message even though it's
already persisted in the Pulsar topic.
Here is the code logic:
```Go
if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID !=
ctx.lastChunkedMsgID+1 {
lastChunkedMsgID := -1
totalChunks := -1
if ctx != nil {
lastChunkedMsgID = int(ctx.lastChunkedMsgID)
totalChunks = int(ctx.totalChunks)
ctx.chunkedMsgBuffer.Clear()
}
pc.log.Warnf(fmt.Sprintf(
"Received unexpected chunk messageId %s,
last-chunk-id %d, chunkId = %d, total-chunks %d",
msgID.String(), lastChunkedMsgID, chunkID,
totalChunks))
pc.chunkedMsgCtxMap.remove(uuid)
pc.availablePermits.inc()
return nil
}
```
The bug can be easily reproduced using the testcase
`TestChunkWithReconnection` and `TestResendChunkMessages` introduced by this PR.
### Modifications
The current chunk processing strategy is consistent with the behavior of
the Java client:
https://github.com/apache/pulsar/blob/52a4d5ee84fad6af2736376a6fcdd1bc41e7c52f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1579
When receiving the new duplicated first chunk of a chunked message, the
consumer discard the current chunked message context and create a new context
to track the following messages. For the case mentioned in Motivation, the M1
will be released and the consumer will assemble M2 and M3 as the chunked
message.
---
pulsar/consumer_partition.go | 144 ++++++++++++++++++++++++++++++++--
pulsar/message_chunking_test.go | 167 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 303 insertions(+), 8 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 918de2cf..ee768c4f 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1461,35 +1461,161 @@ func (pc *partitionConsumer)
processMessageChunk(compressedPayload internal.Buff
partitionIdx: pc.partitionIdx,
}
- if msgMeta.GetChunkId() == 0 {
+ if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
+ pc.availablePermits.inc()
+ }
+ if chunkID == 0 {
+ // Handle ack hole case when receive duplicated chunks.
+ // There are two situation that receives chunks with the same
sequence ID and chunk ID.
+ // Situation 1 - Message redeliver:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // In this case, chunk-3 and chunk-4 have the same msgID with
chunk-1 and chunk-2.
+ // This may be caused by message redeliver, we can't ack any
chunk in this case here.
+ // Situation 2 - Corrupted chunk message
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // In this case, all the chunks with different msgIDs and are
persistent in the topic.
+ // But Chunk-1 and Chunk-2 belong to a corrupted chunk message
that must be skipped since
+ // they will not be delivered to end users. So we should ack
them here to avoid ack hole.
+ ctx := pc.chunkedMsgCtxMap.get(uuid)
+ if ctx != nil {
+ isCorruptedChunkMessageDetected := true
+ for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+ if previousChunkMsgID == nil {
+ continue
+ }
+ if previousChunkMsgID.equal(msgID) {
+ isCorruptedChunkMessageDetected = false
+ break
+ }
+ }
+ if isCorruptedChunkMessageDetected {
+ ctx.discard(pc)
+ }
+ // The first chunk of a new chunked-message received
+ // before receiving other chunks of previous
chunked-message
+ // so, remove previous chunked-message from map and
release buffer
+ pc.log.Warnf(fmt.Sprintf(
+ "[%s] [%s] Receive a duplicated chunk id=0
message with messageId [%s], sequenceId [%d], "+
+ "uuid [%s]. Remove previous chunk
context with lastChunkedMsgID [%d]",
+ pc.name,
+ pc.options.subscription,
+ msgID.String(),
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid(),
+ ctx.lastChunkedMsgID,
+ ))
+ ctx.chunkedMsgBuffer.Clear()
+ pc.chunkedMsgCtxMap.remove(uuid)
+ }
pc.chunkedMsgCtxMap.addIfAbsent(uuid,
numChunks,
totalChunksSize,
)
}
+ // discard message if chunk is out-of-order
ctx := pc.chunkedMsgCtxMap.get(uuid)
-
if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID !=
ctx.lastChunkedMsgID+1 {
+ // Filter and ack duplicated chunks instead of discard ctx.
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+ // We should filter and ack chunk-4 and chunk-5.
+ if ctx != nil && chunkID <= ctx.lastChunkedMsgID {
+ pc.log.Warnf(fmt.Sprintf(
+ "[%s] [%s] Receive a duplicated chunk message
with messageId [%s], "+
+ "last-chunk-Id [%d], chunkId [%d],
sequenceId [%d], uuid [%s]",
+ pc.name,
+ pc.options.subscription,
+ msgID.String(),
+ ctx.lastChunkedMsgID,
+ chunkID,
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid(),
+ ))
+ // Just like the above logic of receiving the first
chunk again.
+ // We only ack this chunk in the message duplication
case.
+ isCorruptedChunkMessageDetected := true
+ for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
+ if previousChunkMsgID == nil {
+ continue
+ }
+ if previousChunkMsgID.equal(msgID) {
+ isCorruptedChunkMessageDetected = false
+ break
+ }
+ }
+ if isCorruptedChunkMessageDetected {
+ pc.AckID(toTrackingMessageID(msgID))
+ }
+ return nil
+ }
+ // Chunked messages rely on TCP to ensure that chunk IDs are
strictly increasing within a partition.
+ // If the current chunk ID is greater than ctx.lastChunkedMsgID
+ 1,
+ // it indicates that the current chunk is corrupted and may
require resource cleanup.
lastChunkedMsgID := -1
totalChunks := -1
if ctx != nil {
lastChunkedMsgID = int(ctx.lastChunkedMsgID)
totalChunks = int(ctx.totalChunks)
- ctx.chunkedMsgBuffer.Clear()
}
pc.log.Warnf(fmt.Sprintf(
- "Received unexpected chunk messageId %s, last-chunk-id
%d, chunkId = %d, total-chunks %d",
- msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
- pc.chunkedMsgCtxMap.remove(uuid)
- pc.availablePermits.inc()
+ "[%s] [%s] Received unexpected chunk messageId [%s],
last-chunk-id [%d], "+
+ "chunkId = [%d], total-chunks [%d], sequenceId
[%d], uuid [%s]",
+ pc.Topic(),
+ pc.options.subscription,
+ msgID.String(),
+ lastChunkedMsgID,
+ chunkID,
+ totalChunks,
+ msgMeta.GetSequenceId(),
+ msgMeta.GetUuid()),
+ )
+ if ctx != nil {
+ ctx.chunkedMsgBuffer.Clear()
+ pc.chunkedMsgCtxMap.remove(uuid)
+ }
+ // Consider a scenario where MaxPendingChunkedMessage is set to
1,
+ // and we have two messages (A and B), each consisting of three
chunks:
+ // A chunks are Chunk-1, Chunk-2, Chunk-6 and B chunks are
Chunk-3, Chunk-4, Chunk-5
+ // The consumer receives them in the following order:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // since MaxPendingChunkedMessage is 1, the context for A
is removed
+ // Chunk-3 sequence ID: 1, chunk ID: 0, msgID: 1:3
+ // Chunk-4 sequence ID: 1, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 1, chunk ID: 2, msgID: 1:5
+ // Chunk-6 sequence ID: 0, chunk ID: 2, msgID: 1:6
+ // If we acknowledge Chunk-6 here, message A would be lost.
+ // This is unexpected, as the user would expect A to be
successfully consumed after redelivery.
+ // So the correct logic should be:
+ // If AutoAckIncompleteChunk is true, then acknowledge the
message.
+ // Otherwise, do nothing so that the message can be redelivered
in the future.
+ if pc.options.autoAckIncompleteChunk {
+ pc.AckID(toTrackingMessageID(msgID))
+ }
return nil
}
+ // The chunk ID meets the expected value,
+ // so we add the current chunk to the corresponding chunkedMsgCtx.
ctx.append(chunkID, msgID, compressedPayload)
if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
- pc.availablePermits.inc()
return nil
}
@@ -2518,6 +2644,7 @@ func (c *chunkedMsgCtxMap)
discardOldestChunkMessage(autoAck bool) {
if autoAck {
ctx.discard(c.pc)
}
+ ctx.chunkedMsgBuffer.Clear()
delete(c.chunkedMsgCtxs, oldest)
c.pc.log.Infof("Chunked message [%s] has been removed from
chunkedMsgCtxMap", oldest)
}
@@ -2535,6 +2662,7 @@ func (c *chunkedMsgCtxMap) discardChunkMessage(uuid
string, autoAck bool) {
if autoAck {
ctx.discard(c.pc)
}
+ ctx.chunkedMsgBuffer.Clear()
delete(c.chunkedMsgCtxs, uuid)
e := c.pendingQueue.Front()
for ; e != nil; e = e.Next() {
diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go
index b85e914f..12f0517c 100644
--- a/pulsar/message_chunking_test.go
+++ b/pulsar/message_chunking_test.go
@@ -21,14 +21,17 @@ import (
"context"
"errors"
"fmt"
+ "log/slog"
"math/rand"
"net/http"
+ "os"
"strings"
"sync"
"testing"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/pulsar/log"
"google.golang.org/protobuf/proto"
@@ -578,3 +581,167 @@ func sendSingleChunk(p Producer, uuid string, chunkID
int, totalChunks int) {
uint32(internal.MaxMessageSize),
)
}
+
+func TestChunkWithReconnection(t *testing.T) {
+ sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ Logger: log.NewLoggerWithSlog(sLogger),
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ EnableChunking: true,
+ ChunkMaxMessageSize: 100,
+ MaxPendingMessages: 200000,
+ SendTimeout: 60 * time.Second,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ c, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: "chunk-subscriber",
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ defer c.Close()
+
+ // Reduce publish rate to prevent the producer sending messages too fast
+ url := adminURL + "/" + "admin/v2/persistent/public/default/" + topic +
"/publishRate"
+ makeHTTPCall(t, http.MethodPost, url, "{\"publishThrottlingRateInMsg\":
1,\"publishThrottlingRateInByte\": 100}")
+ // Need to wait some time to let the rate limiter take effect
+ time.Sleep(2 * time.Second)
+
+ // payload/ChunkMaxMessageSize = 1000/100 = 10 msg, and
publishThrottlingRateInMsg = 1
+ // so that this chunk msg will send finish after 10 seconds
+ producer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: createTestMessagePayload(1000),
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
+ assert.Nil(t, err)
+ })
+ assert.NoError(t, err)
+
+ time.Sleep(5 * time.Second)
+ // trigger topic unload to test sending chunk msg with reconnection
+ url = adminURL + "/" + "admin/v2/persistent/public/default/" + topic +
"/unload"
+ makeHTTPCall(t, http.MethodPut, url, "")
+ // Need to wait some time to receive all chunk messages
+ time.Sleep(10 * time.Second)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err := c.Receive(ctx)
+ cancel()
+ assert.NoError(t, err)
+ assert.NotNil(t, msg.ID())
+}
+
+func TestResendChunkMessages(t *testing.T) {
+ sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ Logger: log.NewLoggerWithSlog(sLogger),
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ EnableChunking: true,
+ ChunkMaxMessageSize: 100,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ c, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: "chunk-subscriber",
+ MaxPendingChunkedMessage: 10,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ defer c.Close()
+
+ sendSingleChunk(producer, "0", 0, 2)
+ sendSingleChunk(producer, "0", 0, 2) // Resending the first chunk
+ sendSingleChunk(producer, "1", 0, 3) // This is for testing the
interwoven chunked message
+ sendSingleChunk(producer, "1", 1, 3)
+ sendSingleChunk(producer, "1", 0, 3) // Resending the UUID-1 chunked
message
+ sendSingleChunk(producer, "0", 1, 2)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err := c.Receive(ctx)
+ cancel()
+ assert.NoError(t, err)
+ assert.Equal(t, "chunk-0-0|chunk-0-1|", string(msg.Payload()))
+ c.Ack(msg)
+
+ sendSingleChunk(producer, "1", 1, 3)
+ sendSingleChunk(producer, "1", 2, 3)
+ ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err = c.Receive(ctx)
+ cancel()
+ assert.NoError(t, err)
+ assert.Equal(t, "chunk-1-0|chunk-1-1|chunk-1-2|", string(msg.Payload()))
+ c.Ack(msg)
+}
+
+func TestResendChunkWithAckHoleMessages(t *testing.T) {
+ sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ Logger: log.NewLoggerWithSlog(sLogger),
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ EnableChunking: true,
+ ChunkMaxMessageSize: 100,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ c, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: "chunk-subscriber",
+ MaxPendingChunkedMessage: 10,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ defer c.Close()
+
+ sendSingleChunk(producer, "0", 0, 4)
+ sendSingleChunk(producer, "0", 1, 4)
+ sendSingleChunk(producer, "0", 2, 4)
+ sendSingleChunk(producer, "0", 1, 4) // Resending previous chunk
+ sendSingleChunk(producer, "0", 2, 4)
+ sendSingleChunk(producer, "0", 3, 4)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err := c.Receive(ctx)
+ cancel()
+ assert.NoError(t, err)
+ assert.Equal(t, "chunk-0-0|chunk-0-1|chunk-0-2|chunk-0-3|",
string(msg.Payload()))
+ c.Ack(msg)
+
+ sendSingleChunk(producer, "1", 0, 4)
+ sendSingleChunk(producer, "1", 1, 4)
+ sendSingleChunk(producer, "1", 4, 4) // send broken chunk
+ ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err = c.Receive(ctx)
+ cancel()
+ assert.ErrorIs(t, err, context.DeadlineExceeded)
+}