This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 69afa1a  [feat][client] Support null value messages (#563)
69afa1a is described below

commit 69afa1a800ec53659d68c1c7729d22f424d6813f
Author: Bhargav Kumar Konidena <[email protected]>
AuthorDate: Fri Apr 3 18:09:58 2026 +0530

    [feat][client] Support null value messages (#563)
---
 include/pulsar/Message.h        |  11 ++++
 include/pulsar/MessageBuilder.h |  11 ++++
 include/pulsar/c/message.h      |  19 +++++++
 lib/Commands.cc                 |   4 ++
 lib/Message.cc                  |  13 +++++
 lib/MessageBuilder.cc           |  13 +++++
 lib/c/c_Message.cc              |   4 ++
 tests/BatchMessageTest.cc       |  30 +++++++++++
 tests/MessageTest.cc            |  43 +++++++++++++++
 tests/ReaderTest.cc             | 115 ++++++++++++++++++++++++++++++++++++++++
 tests/TableViewTest.cc          |  87 ++++++++++++++++++++++++++++++
 11 files changed, 350 insertions(+)

diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 0c67411..b92ec6a 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -159,6 +159,17 @@ class PULSAR_PUBLIC Message {
      */
     bool hasOrderingKey() const;
 
+    /**
+     * Check if the message has a null value.
+     *
+     * Messages with null values are used as tombstones on compacted topics
+     * to delete the message for a specific key.
+     *
+     * @return true if the message has a null value (tombstone)
+     *         false if the message has actual payload data
+     */
+    bool hasNullValue() const;
+
     /**
      * Get the UTC based timestamp in milliseconds referring to when the 
message was published by the client
      * producer
diff --git a/include/pulsar/MessageBuilder.h b/include/pulsar/MessageBuilder.h
index c2f089f..8916718 100644
--- a/include/pulsar/MessageBuilder.h
+++ b/include/pulsar/MessageBuilder.h
@@ -156,6 +156,17 @@ class PULSAR_PUBLIC MessageBuilder {
      */
     MessageBuilder& disableReplication(bool flag);
 
+    /**
+     * Mark the message as having a null value.
+     *
+     * This is used for messages on compacted topics where a null value
+     * acts as a tombstone for a specific key, removing the message from
+     * the compacted view.
+     *
+     * @return the message builder instance
+     */
+    MessageBuilder& setNullValue();
+
     /**
      * create a empty message, with no properties or data
      *
diff --git a/include/pulsar/c/message.h b/include/pulsar/c/message.h
index 1f1f91f..8aceca5 100644
--- a/include/pulsar/c/message.h
+++ b/include/pulsar/c/message.h
@@ -127,6 +127,15 @@ PULSAR_PUBLIC void 
pulsar_message_set_replication_clusters(pulsar_message_t *mes
  */
 PULSAR_PUBLIC void pulsar_message_disable_replication(pulsar_message_t 
*message, int flag);
 
+/**
+ * Mark the message as having a null value.
+ *
+ * This is used for messages on compacted topics where a null value
+ * acts as a tombstone for a specific key, removing the message from
+ * the compacted view.
+ */
+PULSAR_PUBLIC void pulsar_message_set_null_value(pulsar_message_t *message);
+
 /// Accessor for built messages
 
 /**
@@ -221,6 +230,16 @@ PULSAR_PUBLIC void 
pulsar_message_set_schema_version(pulsar_message_t *message,
  */
 PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t 
*message);
 
+/**
+ * Check if the message has a null value.
+ *
+ * Messages with null values are used as tombstones on compacted topics
+ * to delete the message for a specific key.
+ *
+ * @return 1 if the message has a null value, 0 otherwise
+ */
+PULSAR_PUBLIC int pulsar_message_has_null_value(pulsar_message_t *message);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 08dc718..3dd2259 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -871,6 +871,10 @@ static std::pair<std::unique_ptr<char[]>, size_t> 
serializeSingleMessageMetadata
         metadata.set_sequence_id(msgMetadata.sequence_id());
     }
 
+    if (msgMetadata.null_value()) {
+        metadata.set_null_value(true);
+    }
+
     size_t size = metadata.ByteSizeLong();
     std::unique_ptr<char[]> data{new char[size]};
     metadata.SerializeToArray(data.get(), size);
diff --git a/lib/Message.cc b/lib/Message.cc
index df6cff9..f4e6d69 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -123,6 +123,12 @@ Message::Message(const MessageId& messageID, 
proto::BrokerEntryMetadata& brokerE
     } else {
         impl_->metadata.clear_sequence_id();
     }
+
+    if (singleMetadata.null_value()) {
+        impl_->metadata.set_null_value(true);
+    } else {
+        impl_->metadata.clear_null_value();
+    }
 }
 
 const MessageId& Message::getMessageId() const {
@@ -177,6 +183,13 @@ const std::string& Message::getOrderingKey() const {
     return impl_->getOrderingKey();
 }
 
+bool Message::hasNullValue() const {
+    if (impl_) {
+        return impl_->metadata.null_value();
+    }
+    return false;
+}
+
 const std::string& Message::getTopicName() const {
     if (!impl_) {
         return emptyString;
diff --git a/lib/MessageBuilder.cc b/lib/MessageBuilder.cc
index a9e61d4..86ff475 100644
--- a/lib/MessageBuilder.cc
+++ b/lib/MessageBuilder.cc
@@ -60,29 +60,35 @@ void MessageBuilder::checkMetadata() {
 MessageBuilder& MessageBuilder::setContent(const void* data, size_t size) {
     checkMetadata();
     impl_->payload = SharedBuffer::copy((char*)data, size);
+    impl_->metadata.clear_null_value();
     return *this;
 }
 
 MessageBuilder& MessageBuilder::setAllocatedContent(void* data, size_t size) {
     checkMetadata();
     impl_->payload = SharedBuffer::wrap((char*)data, size);
+    impl_->metadata.clear_null_value();
     return *this;
 }
 
 MessageBuilder& MessageBuilder::setContent(const std::string& data) {
     checkMetadata();
     impl_->payload = SharedBuffer::copy((char*)data.c_str(), data.length());
+    impl_->metadata.clear_null_value();
     return *this;
 }
 
 MessageBuilder& MessageBuilder::setContent(std::string&& data) {
     checkMetadata();
     impl_->payload = SharedBuffer::take(std::move(data));
+    impl_->metadata.clear_null_value();
     return *this;
 }
 
 MessageBuilder& MessageBuilder::setContent(const KeyValue& data) {
+    checkMetadata();
     impl_->keyValuePtr = data.impl_;
+    impl_->metadata.clear_null_value();
     return *this;
 }
 
@@ -157,6 +163,13 @@ MessageBuilder& MessageBuilder::disableReplication(bool 
flag) {
     return *this;
 }
 
+MessageBuilder& MessageBuilder::setNullValue() {
+    checkMetadata();
+    impl_->metadata.set_null_value(true);
+    impl_->payload = SharedBuffer();
+    return *this;
+}
+
 const char* MessageBuilder::data() const {
     assert(impl_->payload.data());
     return impl_->payload.data();
diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc
index cca0460..51afa8e 100644
--- a/lib/c/c_Message.cc
+++ b/lib/c/c_Message.cc
@@ -81,6 +81,8 @@ void pulsar_message_disable_replication(pulsar_message_t 
*message, int flag) {
     message->builder.disableReplication(flag);
 }
 
+void pulsar_message_set_null_value(pulsar_message_t *message) { 
message->builder.setNullValue(); }
+
 int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
     return message->message.hasProperty(name);
 }
@@ -148,3 +150,5 @@ void pulsar_message_set_schema_version(pulsar_message_t 
*message, const char *sc
 const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
     return message->message.getProducerName().c_str();
 }
+
+int pulsar_message_has_null_value(pulsar_message_t *message) { return 
message->message.hasNullValue(); }
diff --git a/tests/BatchMessageTest.cc b/tests/BatchMessageTest.cc
index 0b61de1..0b786c3 100644
--- a/tests/BatchMessageTest.cc
+++ b/tests/BatchMessageTest.cc
@@ -988,6 +988,36 @@ TEST(BatchMessageTest, testParseMessageBatchEntry) {
     }
 }
 
+TEST(BatchMessageTest, testParseMessageBatchEntryWithNullValue) {
+    std::vector<Message> msgs;
+    
msgs.emplace_back(MessageBuilder().setPartitionKey("key1").setNullValue().build());
+    
msgs.emplace_back(MessageBuilder().setContent("content2").setPartitionKey("key2").build());
+    
msgs.emplace_back(MessageBuilder().setPartitionKey("key3").setNullValue().build());
+
+    SharedBuffer payload;
+    Commands::serializeSingleMessagesToBatchPayload(payload, msgs);
+    ASSERT_EQ(payload.writableBytes(), 0);
+
+    MessageBatch messageBatch;
+    auto fakeId = 
MessageIdBuilder().ledgerId(6000L).entryId(20L).partition(0).build();
+    messageBatch.withMessageId(fakeId).parseFrom(payload, 
static_cast<uint32_t>(msgs.size()));
+    const std::vector<Message>& messages = messageBatch.messages();
+
+    ASSERT_EQ(messages.size(), 3);
+
+    ASSERT_TRUE(messages[0].hasNullValue());
+    ASSERT_EQ(messages[0].getPartitionKey(), "key1");
+    ASSERT_EQ(messages[0].getLength(), 0);
+
+    ASSERT_FALSE(messages[1].hasNullValue());
+    ASSERT_EQ(messages[1].getPartitionKey(), "key2");
+    ASSERT_EQ(messages[1].getDataAsString(), "content2");
+
+    ASSERT_TRUE(messages[2].hasNullValue());
+    ASSERT_EQ(messages[2].getPartitionKey(), "key3");
+    ASSERT_EQ(messages[2].getLength(), 0);
+}
+
 TEST(BatchMessageTest, testSendCallback) {
     const std::string topicName = 
"persistent://public/default/BasicMessageTest-testSendCallback";
 
diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc
index 688cb33..0ffcc41 100644
--- a/tests/MessageTest.cc
+++ b/tests/MessageTest.cc
@@ -153,3 +153,46 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
     auto msg = MessageBuilder().setContent("test").build();
     ASSERT_TRUE(msg.getTopicName().empty());
 }
+
+TEST(MessageTest, testNullValueMessage) {
+    {
+        auto msg = MessageBuilder().setContent("test").build();
+        ASSERT_FALSE(msg.hasNullValue());
+    }
+
+    {
+        auto msg = 
MessageBuilder().setNullValue().setPartitionKey("key1").build();
+        ASSERT_TRUE(msg.hasNullValue());
+        ASSERT_EQ(msg.getLength(), 0);
+        ASSERT_EQ(msg.getPartitionKey(), "key1");
+    }
+
+    {
+        auto msg = 
MessageBuilder().setPartitionKey("key2").setNullValue().build();
+        ASSERT_TRUE(msg.hasNullValue());
+        ASSERT_EQ(msg.getPartitionKey(), "key2");
+    }
+}
+
+TEST(MessageTest, testEmptyMessage) {
+    auto msg = MessageBuilder().build();
+    ASSERT_FALSE(msg.hasNullValue());
+    ASSERT_EQ(msg.getLength(), 0);
+}
+
+TEST(MessageTest, testEmptyStringNotNullValue) {
+    // Empty string message - has content set to ""
+    auto emptyStringMsg = MessageBuilder().setContent("").build();
+    ASSERT_FALSE(emptyStringMsg.hasNullValue());
+    ASSERT_EQ(emptyStringMsg.getLength(), 0);
+    ASSERT_EQ(emptyStringMsg.getDataAsString(), "");
+
+    // Null value message - explicitly marked as null
+    auto nullValueMsg = 
MessageBuilder().setNullValue().setPartitionKey("key").build();
+    ASSERT_TRUE(nullValueMsg.hasNullValue());
+    ASSERT_EQ(nullValueMsg.getLength(), 0);
+
+    // Both have length 0, but they are semantically different
+    // Empty string: the value IS an empty string
+    // Null value: the value does not exist (tombstone for compaction)
+}
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index af833ce..302df1f 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -1045,5 +1045,120 @@ TEST(ReaderTest, 
testReaderWithZeroMessageListenerThreads) {
     client.close();
 }
 
+TEST(ReaderTest, testReadCompactedWithNullValue) {
+    Client client(serviceUrl);
+
+    const std::string topicName =
+        "persistent://public/default/testReadCompactedWithNullValue-" + 
std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    // Send messages with keys
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));
+
+    // Send a tombstone (null value) for key2
+    auto tombstone = 
MessageBuilder().setPartitionKey("key2").setNullValue().build();
+    ASSERT_TRUE(tombstone.hasNullValue());
+    ASSERT_EQ(tombstone.getLength(), 0);
+    ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+    // Update key1 with a new value
+    ASSERT_EQ(ResultOk,
+              
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));
+
+    // Create a reader with readCompacted enabled to read all messages (before 
compaction runs)
+    ReaderConfiguration readerConf;
+    readerConf.setReadCompacted(true);
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), 
readerConf, reader));
+
+    // Read all messages and verify we can detect null values
+    std::map<std::string, std::string> keyValues;
+    std::set<std::string> nullValueKeys;
+    int messageCount = 0;
+
+    bool hasMessageAvailable = false;
+    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+    while (hasMessageAvailable) {
+        Message msg;
+        ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+        messageCount++;
+
+        std::string key = msg.getPartitionKey();
+        if (msg.hasNullValue()) {
+            nullValueKeys.insert(key);
+            LOG_INFO("Received null value (tombstone) for key: " << key);
+        } else {
+            keyValues[key] = msg.getDataAsString();
+            LOG_INFO("Received message for key: " << key << ", value: " << 
msg.getDataAsString());
+        }
+
+        ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+    }
+
+    // Verify we read all 5 messages
+    ASSERT_EQ(messageCount, 5) << "Expected to read 5 messages";
+
+    // Verify the null value message was received and detected
+    ASSERT_EQ(nullValueKeys.size(), 1) << "Expected exactly one null value 
message";
+    ASSERT_TRUE(nullValueKeys.count("key2") > 0) << "key2 should have a null 
value (tombstone)";
+
+    // Verify key1 has the latest value (value1-updated overwrites value1)
+    ASSERT_EQ(keyValues["key1"], "value1-updated") << "key1 should have the 
updated value";
+    ASSERT_EQ(keyValues["key3"], "value3") << "key3 should have value3";
+
+    producer.close();
+    reader.close();
+    client.close();
+}
+
+TEST(ReaderTest, testNullValueMessageProperties) {
+    Client client(serviceUrl);
+
+    const std::string topicName =
+        "persistent://public/default/testNullValueMessageProperties-" + 
std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    // Send a null value message with properties
+    auto tombstone = MessageBuilder()
+                         .setPartitionKey("user-123")
+                         .setNullValue()
+                         .setProperty("reason", "account-deleted")
+                         .setProperty("deleted-by", "admin")
+                         .build();
+
+    ASSERT_TRUE(tombstone.hasNullValue());
+    ASSERT_EQ(tombstone.getPartitionKey(), "user-123");
+    ASSERT_EQ(tombstone.getProperty("reason"), "account-deleted");
+    ASSERT_EQ(tombstone.getProperty("deleted-by"), "admin");
+    ASSERT_EQ(tombstone.getLength(), 0);
+
+    ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+    // Create a reader and verify the message
+    ReaderConfiguration readerConf;
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), 
readerConf, reader));
+
+    Message msg;
+    ASSERT_EQ(ResultOk, reader.readNext(msg, 5000));
+
+    // Verify all properties are preserved
+    ASSERT_TRUE(msg.hasNullValue());
+    ASSERT_EQ(msg.getPartitionKey(), "user-123");
+    ASSERT_EQ(msg.getProperty("reason"), "account-deleted");
+    ASSERT_EQ(msg.getProperty("deleted-by"), "admin");
+    ASSERT_EQ(msg.getLength(), 0);
+
+    producer.close();
+    reader.close();
+    client.close();
+}
+
 INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
 INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, 
false));
diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc
index b51b2c8..c1e648f 100644
--- a/tests/TableViewTest.cc
+++ b/tests/TableViewTest.cc
@@ -157,6 +157,93 @@ TEST(TableViewTest, testPublishEmptyValue) {
     client.close();
 }
 
+TEST(TableViewTest, testNullValueTombstone) {
+    const std::string topic = "testNullValueTombstone" + 
std::to_string(time(nullptr));
+    Client client(lookupUrl);
+
+    ProducerConfiguration producerConfiguration;
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, 
producer));
+
+    // Send initial messages with keys
+    auto count = 10;
+    for (int i = 0; i < count; ++i) {
+        auto msg = MessageBuilder()
+                       .setPartitionKey("key" + std::to_string(i))
+                       .setContent("value" + std::to_string(i))
+                       .build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // Create table view and verify all keys are present
+    TableView tableView;
+    ASSERT_EQ(ResultOk, client.createTableView(topic, {}, tableView));
+    ASSERT_EQ(tableView.size(), count);
+
+    std::string value;
+    ASSERT_TRUE(tableView.containsKey("key5"));
+    ASSERT_TRUE(tableView.getValue("key5", value));
+    ASSERT_EQ(value, "value5");
+
+    // Send a null value (tombstone) for key5 using setNullValue()
+    auto tombstone = 
MessageBuilder().setPartitionKey("key5").setNullValue().build();
+    ASSERT_TRUE(tombstone.hasNullValue());
+    ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+    // Wait for table view to process the tombstone and remove the key
+    waitUntil(
+        std::chrono::seconds(2), [&] { return !tableView.containsKey("key5"); 
}, 100);
+
+    // Verify key5 was removed by the tombstone
+    ASSERT_FALSE(tableView.containsKey("key5"));
+    ASSERT_EQ(tableView.size(), count - 1);
+
+    // Verify other keys are still present
+    ASSERT_TRUE(tableView.containsKey("key0"));
+    ASSERT_TRUE(tableView.containsKey("key9"));
+
+    client.close();
+}
+
+TEST(TableViewTest, testNullValueVsEmptyString) {
+    const std::string topic = "testNullValueVsEmptyString" + 
std::to_string(time(nullptr));
+    Client client(lookupUrl);
+
+    ProducerConfiguration producerConfiguration;
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, 
producer));
+
+    // Send messages for two keys
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setPartitionKey("keyA").setContent("valueA").build()));
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setPartitionKey("keyB").setContent("valueB").build()));
+
+    TableView tableView;
+    ASSERT_EQ(ResultOk, client.createTableView(topic, {}, tableView));
+    ASSERT_EQ(tableView.size(), 2);
+
+    // Send empty string for keyA - this should also remove it from TableView
+    // (TableView treats empty payload as deletion)
+    auto emptyMsg = 
MessageBuilder().setPartitionKey("keyA").setContent("").build();
+    ASSERT_FALSE(emptyMsg.hasNullValue());
+    ASSERT_EQ(ResultOk, producer.send(emptyMsg));
+
+    // Send null value (tombstone) for keyB using setNullValue()
+    auto nullMsg = 
MessageBuilder().setPartitionKey("keyB").setNullValue().build();
+    ASSERT_TRUE(nullMsg.hasNullValue());
+    ASSERT_EQ(ResultOk, producer.send(nullMsg));
+
+    // Wait for both to be processed
+    waitUntil(
+        std::chrono::seconds(2), [&] { return tableView.size() == 0; }, 100);
+
+    // Both keys should be removed
+    ASSERT_FALSE(tableView.containsKey("keyA"));
+    ASSERT_FALSE(tableView.containsKey("keyB"));
+    ASSERT_EQ(tableView.size(), 0);
+
+    client.close();
+}
+
 TEST(TableViewTest, testNotSupportNonPersistentTopic) {
     const std::string topic = TopicDomain::NonPersistent +
                               
"://public/default/testNotSupportNonPersistentTopic" +

Reply via email to