This is an automated email from the ASF dual-hosted git repository.

nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new fe8aaaf1 [Reader] fix: deliver null-value tombstones instead of 
discarding them (#1482)
fe8aaaf1 is described below

commit fe8aaaf19bb7a9096143d638623e26943e5b7913
Author: João Moreira Fernandes <[email protected]>
AuthorDate: Tue Apr 28 02:51:18 2026 +0100

    [Reader] fix: deliver null-value tombstones instead of discarding them 
(#1482)
    
    * fix(reader): deliver null-value tombstones instead of discarding them
    
    A message published with MessageMetadata.null_value = true (the Pulsar
    compaction tombstone convention) was conflated with a deserialization
    failure in partitionConsumer.MessageReceived and silently discarded.
    Because lastDequeuedMsg was never advanced past the tombstone,
    hasMoreMessages kept returning true and Reader.Next blocked forever
    when a tombstone was the last message on a topic.
    
    Consumer: when the reader yields an empty payload and msgMeta or the
    single-message metadata has null_value set, build a normal message with
    payLoad == nil and take the usual dispatch path so lastDequeuedMsg
    advances. Real corruption still routes through discardCorruptedMessage.
    
    Producer: set MessageMetadata.null_value / SingleMessageMetadata.null_value
    when both Value and Payload are nil, matching the Java client so
    Go-produced tombstones carry the flag consumers need.
    
    Message gains an IsNullValue() bool accessor so applications can tell
    tombstones apart from empty payloads.
    
    * fix: explicitly reseting err
    
    Quoting Copilot's feedback:
    
    When accepting a tombstone with err == internal.ErrEOM, the code keeps err
    non-nil and continues. It works today because err is not used later, but 
it’s
    easy to misread and could become a latent bug if future logic 
inspects/returns
    err after this block. Consider explicitly setting err = nil in the
    tombstone/ErrEOM acceptance path to make the intent unambiguous.
    
    * fix: supress lint error
---
 pulsar/consumer_partition.go                       | 22 +++++++-
 pulsar/impl_message.go                             |  5 ++
 .../pulsartracing/message_carrier_util_test.go     |  4 ++
 pulsar/message.go                                  |  6 ++
 pulsar/negative_acks_tracker_test.go               |  8 +++
 pulsar/producer_partition.go                       |  8 +++
 pulsar/reader_test.go                              | 64 ++++++++++++++++++++++
 7 files changed, 116 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ee768c4f..a0190f70 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1308,10 +1308,28 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        )
        for i := 0; i < numMsgs; i++ {
                smm, payload, err := reader.ReadMessage()
-               if err != nil || payload == nil {
+               isNullValue := msgMeta.GetNullValue() || (smm != nil && 
smm.GetNullValue())
+               if err != nil {
+                       // A null-value (tombstone) message has no payload 
bytes on the wire, so
+                       // the non-batched reader returns ErrEOM. Accept it 
instead of discarding
+                       // it as corrupted, matching the Java client's behavior 
for compaction
+                       // tombstones.
+                       if isNullValue && err == internal.ErrEOM {
+                               payload = nil
+                               // Explicit reset to make tombstone-acceptance
+                               // intent unambiguous.
+                               err = nil //nolint:ineffassign
+                       } else {
+                               pc.discardCorruptedMessage(pbMsgID, 
pb.CommandAck_BatchDeSerializeError)
+                               return err
+                       }
+               } else if payload == nil && !isNullValue {
                        pc.discardCorruptedMessage(pbMsgID, 
pb.CommandAck_BatchDeSerializeError)
                        return err
                }
+               if isNullValue {
+                       payload = nil
+               }
                if ackSet != nil && !ackSet.Test(uint(i)) {
                        pc.log.Debugf("Ignoring message from %vth message, 
which has been acknowledged", i)
                        skippedMessages++
@@ -1396,6 +1414,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                                index:               messageIndex,
                                brokerPublishTime:   brokerPublishTime,
                                conn:                pc._getConn(),
+                               isNullValue:         isNullValue,
                        }
                } else {
                        msg = &message{
@@ -1417,6 +1436,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                                index:               messageIndex,
                                brokerPublishTime:   brokerPublishTime,
                                conn:                pc._getConn(),
+                               isNullValue:         isNullValue,
                        }
                }
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 4f314c3d..de338516 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -316,6 +316,7 @@ type message struct {
        index               *uint64
        brokerPublishTime   *time.Time
        conn                internal.Connection
+       isNullValue         bool
 }
 
 func (msg *message) Topic() string {
@@ -330,6 +331,10 @@ func (msg *message) Payload() []byte {
        return msg.payLoad
 }
 
+func (msg *message) IsNullValue() bool {
+       return msg.isNullValue
+}
+
 func (msg *message) ID() MessageID {
        return msg.msgID
 }
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go 
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index 90658c1a..07f4310c 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -80,6 +80,10 @@ func (msg *mockConsumerMessage) Payload() []byte {
        return nil
 }
 
+func (msg *mockConsumerMessage) IsNullValue() bool {
+       return false
+}
+
 func (msg *mockConsumerMessage) ID() pulsar.MessageID {
        return nil
 }
diff --git a/pulsar/message.go b/pulsar/message.go
index 2cc04722..8d82147a 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -90,6 +90,12 @@ type Message interface {
        // Payload returns the payload of the message
        Payload() []byte
 
+       // IsNullValue reports whether the message was published as a null-value
+       // (tombstone) message, i.e. with MessageMetadata.null_value set. For 
such
+       // messages Payload returns nil. Applications use this flag together 
with
+       // Pulsar topic compaction to mark a key as deleted.
+       IsNullValue() bool
+
        // ID returns the unique message ID associated with this message.
        // The message id can be used to univocally refer to a message without 
having the keep the entire payload in memory.
        ID() MessageID
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index bf395346..a6c990ff 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -197,6 +197,10 @@ func (msg *mockMessage1) Payload() []byte {
        return nil
 }
 
+func (msg *mockMessage1) IsNullValue() bool {
+       return false
+}
+
 func (msg *mockMessage1) ID() MessageID {
        return &messageID{
                ledgerID: 1,
@@ -273,6 +277,10 @@ func (msg *mockMessage2) Payload() []byte {
        return nil
 }
 
+func (msg *mockMessage2) IsNullValue() bool {
+       return false
+}
+
 func (msg *mockMessage2) ID() MessageID {
        return &messageID{
                ledgerID: 2,
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 1e0361bc..833e9f4b 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -724,6 +724,10 @@ func (p *partitionProducer) genMetadata(msg 
*ProducerMessage,
                UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
        }
 
+       if msg.Value == nil && msg.Payload == nil {
+               mm.NullValue = proto.Bool(true)
+       }
+
        if !msg.EventTime.IsZero() {
                mm.EventTime = 
proto.Uint64(internal.TimestampMillis(msg.EventTime))
        }
@@ -771,6 +775,10 @@ func (p *partitionProducer) 
genSingleMessageMetadataInBatch(
                PayloadSize: proto.Int32(int32(uncompressedSize)),
        }
 
+       if msg.Value == nil && msg.Payload == nil {
+               smm.NullValue = proto.Bool(true)
+       }
+
        if !msg.EventTime.IsZero() {
                smm.EventTime = 
proto.Uint64(internal.TimestampMillis(msg.EventTime))
        }
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index e9d01ee1..29d61b2b 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -496,6 +496,70 @@ func TestReaderHasNext(t *testing.T) {
        assert.Equal(t, 10, i)
 }
 
+// TestReaderNullValueTombstone verifies that a message published with a nil
+// Value / nil Payload (the Pulsar compaction tombstone convention) is 
delivered
+// to Reader.Next.
+func TestReaderNullValueTombstone(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Key:     "k",
+               Payload: []byte("v1"),
+       })
+       assert.NoError(t, err)
+
+       tombstoneID, err := producer.Send(ctx, &ProducerMessage{
+               Key:     "k",
+               Payload: nil,
+       })
+       assert.NoError(t, err)
+       producer.Close()
+
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:                   topic,
+               StartMessageID:          EarliestMessageID(),
+               StartMessageIDInclusive: true,
+       })
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       lastID, err := reader.GetLastMessageID()
+       assert.NoError(t, err)
+       assert.Equal(t, tombstoneID.LedgerID(), lastID.LedgerID())
+       assert.Equal(t, tombstoneID.EntryID(), lastID.EntryID())
+
+       firstCtx, firstCancel := context.WithTimeout(ctx, 5*time.Second)
+       defer firstCancel()
+       msg, err := reader.Next(firstCtx)
+       assert.NoError(t, err)
+       assert.Equal(t, []byte("v1"), msg.Payload())
+       assert.False(t, msg.IsNullValue())
+
+       secondCtx, secondCancel := context.WithTimeout(ctx, 5*time.Second)
+       defer secondCancel()
+       tombstone, err := reader.Next(secondCtx)
+       assert.NoError(t, err)
+       assert.True(t, tombstone.IsNullValue())
+       assert.Nil(t, tombstone.Payload())
+       assert.Equal(t, lastID.LedgerID(), tombstone.ID().LedgerID())
+       assert.Equal(t, lastID.EntryID(), tombstone.ID().EntryID())
+
+       assert.False(t, reader.HasNext())
+}
+
 type myMessageID struct {
        data []byte
 }

Reply via email to