This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new fe8aaaf1 [Reader] fix: deliver null-value tombstones instead of
discarding them (#1482)
fe8aaaf1 is described below
commit fe8aaaf19bb7a9096143d638623e26943e5b7913
Author: João Moreira Fernandes <[email protected]>
AuthorDate: Tue Apr 28 02:51:18 2026 +0100
[Reader] fix: deliver null-value tombstones instead of discarding them
(#1482)
* fix(reader): deliver null-value tombstones instead of discarding them
A message published with MessageMetadata.null_value = true (the Pulsar
compaction tombstone convention) was conflated with a deserialization
failure in partitionConsumer.MessageReceived and silently discarded.
Because lastDequeuedMsg was never advanced past the tombstone,
hasMoreMessages kept returning true and Reader.Next blocked forever
when a tombstone was the last message on a topic.
Consumer: when the reader yields an empty payload and msgMeta or the
single-message metadata has null_value set, build a normal message with
payLoad == nil and take the usual dispatch path so lastDequeuedMsg
advances. Real corruption still routes through discardCorruptedMessage.
Producer: set MessageMetadata.null_value / SingleMessageMetadata.null_value
when both Value and Payload are nil, matching the Java client so
Go-produced tombstones carry the flag consumers need.
Message gains an IsNullValue() bool accessor so applications can tell
tombstones apart from empty payloads.
* fix: explicitly reseting err
Quoting Copilot's feedback:
When accepting a tombstone with err == internal.ErrEOM, the code keeps err
non-nil and continues. It works today because err is not used later, but
it’s
easy to misread and could become a latent bug if future logic
inspects/returns
err after this block. Consider explicitly setting err = nil in the
tombstone/ErrEOM acceptance path to make the intent unambiguous.
* fix: supress lint error
---
pulsar/consumer_partition.go | 22 +++++++-
pulsar/impl_message.go | 5 ++
.../pulsartracing/message_carrier_util_test.go | 4 ++
pulsar/message.go | 6 ++
pulsar/negative_acks_tracker_test.go | 8 +++
pulsar/producer_partition.go | 8 +++
pulsar/reader_test.go | 64 ++++++++++++++++++++++
7 files changed, 116 insertions(+), 1 deletion(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ee768c4f..a0190f70 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1308,10 +1308,28 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
)
for i := 0; i < numMsgs; i++ {
smm, payload, err := reader.ReadMessage()
- if err != nil || payload == nil {
+ isNullValue := msgMeta.GetNullValue() || (smm != nil &&
smm.GetNullValue())
+ if err != nil {
+ // A null-value (tombstone) message has no payload
bytes on the wire, so
+ // the non-batched reader returns ErrEOM. Accept it
instead of discarding
+ // it as corrupted, matching the Java client's behavior
for compaction
+ // tombstones.
+ if isNullValue && err == internal.ErrEOM {
+ payload = nil
+ // Explicit reset to make tombstone-acceptance
+ // intent unambiguous.
+ err = nil //nolint:ineffassign
+ } else {
+ pc.discardCorruptedMessage(pbMsgID,
pb.CommandAck_BatchDeSerializeError)
+ return err
+ }
+ } else if payload == nil && !isNullValue {
pc.discardCorruptedMessage(pbMsgID,
pb.CommandAck_BatchDeSerializeError)
return err
}
+ if isNullValue {
+ payload = nil
+ }
if ackSet != nil && !ackSet.Test(uint(i)) {
pc.log.Debugf("Ignoring message from %vth message,
which has been acknowledged", i)
skippedMessages++
@@ -1396,6 +1414,7 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
index: messageIndex,
brokerPublishTime: brokerPublishTime,
conn: pc._getConn(),
+ isNullValue: isNullValue,
}
} else {
msg = &message{
@@ -1417,6 +1436,7 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
index: messageIndex,
brokerPublishTime: brokerPublishTime,
conn: pc._getConn(),
+ isNullValue: isNullValue,
}
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 4f314c3d..de338516 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -316,6 +316,7 @@ type message struct {
index *uint64
brokerPublishTime *time.Time
conn internal.Connection
+ isNullValue bool
}
func (msg *message) Topic() string {
@@ -330,6 +331,10 @@ func (msg *message) Payload() []byte {
return msg.payLoad
}
+func (msg *message) IsNullValue() bool {
+ return msg.isNullValue
+}
+
func (msg *message) ID() MessageID {
return msg.msgID
}
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index 90658c1a..07f4310c 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -80,6 +80,10 @@ func (msg *mockConsumerMessage) Payload() []byte {
return nil
}
+func (msg *mockConsumerMessage) IsNullValue() bool {
+ return false
+}
+
func (msg *mockConsumerMessage) ID() pulsar.MessageID {
return nil
}
diff --git a/pulsar/message.go b/pulsar/message.go
index 2cc04722..8d82147a 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -90,6 +90,12 @@ type Message interface {
// Payload returns the payload of the message
Payload() []byte
+ // IsNullValue reports whether the message was published as a null-value
+ // (tombstone) message, i.e. with MessageMetadata.null_value set. For
such
+ // messages Payload returns nil. Applications use this flag together
with
+ // Pulsar topic compaction to mark a key as deleted.
+ IsNullValue() bool
+
// ID returns the unique message ID associated with this message.
// The message id can be used to univocally refer to a message without
having the keep the entire payload in memory.
ID() MessageID
diff --git a/pulsar/negative_acks_tracker_test.go
b/pulsar/negative_acks_tracker_test.go
index bf395346..a6c990ff 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -197,6 +197,10 @@ func (msg *mockMessage1) Payload() []byte {
return nil
}
+func (msg *mockMessage1) IsNullValue() bool {
+ return false
+}
+
func (msg *mockMessage1) ID() MessageID {
return &messageID{
ledgerID: 1,
@@ -273,6 +277,10 @@ func (msg *mockMessage2) Payload() []byte {
return nil
}
+func (msg *mockMessage2) IsNullValue() bool {
+ return false
+}
+
func (msg *mockMessage2) ID() MessageID {
return &messageID{
ledgerID: 2,
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 1e0361bc..833e9f4b 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -724,6 +724,10 @@ func (p *partitionProducer) genMetadata(msg
*ProducerMessage,
UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
}
+ if msg.Value == nil && msg.Payload == nil {
+ mm.NullValue = proto.Bool(true)
+ }
+
if !msg.EventTime.IsZero() {
mm.EventTime =
proto.Uint64(internal.TimestampMillis(msg.EventTime))
}
@@ -771,6 +775,10 @@ func (p *partitionProducer)
genSingleMessageMetadataInBatch(
PayloadSize: proto.Int32(int32(uncompressedSize)),
}
+ if msg.Value == nil && msg.Payload == nil {
+ smm.NullValue = proto.Bool(true)
+ }
+
if !msg.EventTime.IsZero() {
smm.EventTime =
proto.Uint64(internal.TimestampMillis(msg.EventTime))
}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index e9d01ee1..29d61b2b 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -496,6 +496,70 @@ func TestReaderHasNext(t *testing.T) {
assert.Equal(t, 10, i)
}
+// TestReaderNullValueTombstone verifies that a message published with a nil
+// Value / nil Payload (the Pulsar compaction tombstone convention) is
delivered
+// to Reader.Next.
+func TestReaderNullValueTombstone(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+
+ _, err = producer.Send(ctx, &ProducerMessage{
+ Key: "k",
+ Payload: []byte("v1"),
+ })
+ assert.NoError(t, err)
+
+ tombstoneID, err := producer.Send(ctx, &ProducerMessage{
+ Key: "k",
+ Payload: nil,
+ })
+ assert.NoError(t, err)
+ producer.Close()
+
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: EarliestMessageID(),
+ StartMessageIDInclusive: true,
+ })
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ lastID, err := reader.GetLastMessageID()
+ assert.NoError(t, err)
+ assert.Equal(t, tombstoneID.LedgerID(), lastID.LedgerID())
+ assert.Equal(t, tombstoneID.EntryID(), lastID.EntryID())
+
+ firstCtx, firstCancel := context.WithTimeout(ctx, 5*time.Second)
+ defer firstCancel()
+ msg, err := reader.Next(firstCtx)
+ assert.NoError(t, err)
+ assert.Equal(t, []byte("v1"), msg.Payload())
+ assert.False(t, msg.IsNullValue())
+
+ secondCtx, secondCancel := context.WithTimeout(ctx, 5*time.Second)
+ defer secondCancel()
+ tombstone, err := reader.Next(secondCtx)
+ assert.NoError(t, err)
+ assert.True(t, tombstone.IsNullValue())
+ assert.Nil(t, tombstone.Payload())
+ assert.Equal(t, lastID.LedgerID(), tombstone.ID().LedgerID())
+ assert.Equal(t, lastID.EntryID(), tombstone.ID().EntryID())
+
+ assert.False(t, reader.HasNext())
+}
+
type myMessageID struct {
data []byte
}