This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 69afa1a [feat][client] Support null value messages (#563)
69afa1a is described below
commit 69afa1a800ec53659d68c1c7729d22f424d6813f
Author: Bhargav Kumar Konidena <[email protected]>
AuthorDate: Fri Apr 3 18:09:58 2026 +0530
[feat][client] Support null value messages (#563)
---
include/pulsar/Message.h | 11 ++++
include/pulsar/MessageBuilder.h | 11 ++++
include/pulsar/c/message.h | 19 +++++++
lib/Commands.cc | 4 ++
lib/Message.cc | 13 +++++
lib/MessageBuilder.cc | 13 +++++
lib/c/c_Message.cc | 4 ++
tests/BatchMessageTest.cc | 30 +++++++++++
tests/MessageTest.cc | 43 +++++++++++++++
tests/ReaderTest.cc | 115 ++++++++++++++++++++++++++++++++++++++++
tests/TableViewTest.cc | 87 ++++++++++++++++++++++++++++++
11 files changed, 350 insertions(+)
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 0c67411..b92ec6a 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -159,6 +159,17 @@ class PULSAR_PUBLIC Message {
*/
bool hasOrderingKey() const;
+ /**
+ * Check if the message has a null value.
+ *
+ * Messages with null values are used as tombstones on compacted topics
+ * to delete the message for a specific key.
+ *
+ * @return true if the message has a null value (tombstone)
+ * false if the message has actual payload data
+ */
+ bool hasNullValue() const;
+
/**
* Get the UTC based timestamp in milliseconds referring to when the
message was published by the client
* producer
diff --git a/include/pulsar/MessageBuilder.h b/include/pulsar/MessageBuilder.h
index c2f089f..8916718 100644
--- a/include/pulsar/MessageBuilder.h
+++ b/include/pulsar/MessageBuilder.h
@@ -156,6 +156,17 @@ class PULSAR_PUBLIC MessageBuilder {
*/
MessageBuilder& disableReplication(bool flag);
+ /**
+ * Mark the message as having a null value.
+ *
+ * This is used for messages on compacted topics where a null value
+ * acts as a tombstone for a specific key, removing the message from
+ * the compacted view.
+ *
+ * @return the message builder instance
+ */
+ MessageBuilder& setNullValue();
+
/**
* create a empty message, with no properties or data
*
diff --git a/include/pulsar/c/message.h b/include/pulsar/c/message.h
index 1f1f91f..8aceca5 100644
--- a/include/pulsar/c/message.h
+++ b/include/pulsar/c/message.h
@@ -127,6 +127,15 @@ PULSAR_PUBLIC void
pulsar_message_set_replication_clusters(pulsar_message_t *mes
*/
PULSAR_PUBLIC void pulsar_message_disable_replication(pulsar_message_t
*message, int flag);
+/**
+ * Mark the message as having a null value.
+ *
+ * This is used for messages on compacted topics where a null value
+ * acts as a tombstone for a specific key, removing the message from
+ * the compacted view.
+ */
+PULSAR_PUBLIC void pulsar_message_set_null_value(pulsar_message_t *message);
+
/// Accessor for built messages
/**
@@ -221,6 +230,16 @@ PULSAR_PUBLIC void
pulsar_message_set_schema_version(pulsar_message_t *message,
*/
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t
*message);
+/**
+ * Check if the message has a null value.
+ *
+ * Messages with null values are used as tombstones on compacted topics
+ * to delete the message for a specific key.
+ *
+ * @return 1 if the message has a null value, 0 otherwise
+ */
+PULSAR_PUBLIC int pulsar_message_has_null_value(pulsar_message_t *message);
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 08dc718..3dd2259 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -871,6 +871,10 @@ static std::pair<std::unique_ptr<char[]>, size_t>
serializeSingleMessageMetadata
metadata.set_sequence_id(msgMetadata.sequence_id());
}
+ if (msgMetadata.null_value()) {
+ metadata.set_null_value(true);
+ }
+
size_t size = metadata.ByteSizeLong();
std::unique_ptr<char[]> data{new char[size]};
metadata.SerializeToArray(data.get(), size);
diff --git a/lib/Message.cc b/lib/Message.cc
index df6cff9..f4e6d69 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -123,6 +123,12 @@ Message::Message(const MessageId& messageID,
proto::BrokerEntryMetadata& brokerE
} else {
impl_->metadata.clear_sequence_id();
}
+
+ if (singleMetadata.null_value()) {
+ impl_->metadata.set_null_value(true);
+ } else {
+ impl_->metadata.clear_null_value();
+ }
}
const MessageId& Message::getMessageId() const {
@@ -177,6 +183,13 @@ const std::string& Message::getOrderingKey() const {
return impl_->getOrderingKey();
}
+bool Message::hasNullValue() const {
+ if (impl_) {
+ return impl_->metadata.null_value();
+ }
+ return false;
+}
+
const std::string& Message::getTopicName() const {
if (!impl_) {
return emptyString;
diff --git a/lib/MessageBuilder.cc b/lib/MessageBuilder.cc
index a9e61d4..86ff475 100644
--- a/lib/MessageBuilder.cc
+++ b/lib/MessageBuilder.cc
@@ -60,29 +60,35 @@ void MessageBuilder::checkMetadata() {
MessageBuilder& MessageBuilder::setContent(const void* data, size_t size) {
checkMetadata();
impl_->payload = SharedBuffer::copy((char*)data, size);
+ impl_->metadata.clear_null_value();
return *this;
}
MessageBuilder& MessageBuilder::setAllocatedContent(void* data, size_t size) {
checkMetadata();
impl_->payload = SharedBuffer::wrap((char*)data, size);
+ impl_->metadata.clear_null_value();
return *this;
}
MessageBuilder& MessageBuilder::setContent(const std::string& data) {
checkMetadata();
impl_->payload = SharedBuffer::copy((char*)data.c_str(), data.length());
+ impl_->metadata.clear_null_value();
return *this;
}
MessageBuilder& MessageBuilder::setContent(std::string&& data) {
checkMetadata();
impl_->payload = SharedBuffer::take(std::move(data));
+ impl_->metadata.clear_null_value();
return *this;
}
MessageBuilder& MessageBuilder::setContent(const KeyValue& data) {
+ checkMetadata();
impl_->keyValuePtr = data.impl_;
+ impl_->metadata.clear_null_value();
return *this;
}
@@ -157,6 +163,13 @@ MessageBuilder& MessageBuilder::disableReplication(bool
flag) {
return *this;
}
+MessageBuilder& MessageBuilder::setNullValue() {
+ checkMetadata();
+ impl_->metadata.set_null_value(true);
+ impl_->payload = SharedBuffer();
+ return *this;
+}
+
const char* MessageBuilder::data() const {
assert(impl_->payload.data());
return impl_->payload.data();
diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc
index cca0460..51afa8e 100644
--- a/lib/c/c_Message.cc
+++ b/lib/c/c_Message.cc
@@ -81,6 +81,8 @@ void pulsar_message_disable_replication(pulsar_message_t
*message, int flag) {
message->builder.disableReplication(flag);
}
+void pulsar_message_set_null_value(pulsar_message_t *message) {
message->builder.setNullValue(); }
+
int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
return message->message.hasProperty(name);
}
@@ -148,3 +150,5 @@ void pulsar_message_set_schema_version(pulsar_message_t
*message, const char *sc
const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
return message->message.getProducerName().c_str();
}
+
+int pulsar_message_has_null_value(pulsar_message_t *message) { return
message->message.hasNullValue(); }
diff --git a/tests/BatchMessageTest.cc b/tests/BatchMessageTest.cc
index 0b61de1..0b786c3 100644
--- a/tests/BatchMessageTest.cc
+++ b/tests/BatchMessageTest.cc
@@ -988,6 +988,36 @@ TEST(BatchMessageTest, testParseMessageBatchEntry) {
}
}
+TEST(BatchMessageTest, testParseMessageBatchEntryWithNullValue) {
+ std::vector<Message> msgs;
+
msgs.emplace_back(MessageBuilder().setPartitionKey("key1").setNullValue().build());
+
msgs.emplace_back(MessageBuilder().setContent("content2").setPartitionKey("key2").build());
+
msgs.emplace_back(MessageBuilder().setPartitionKey("key3").setNullValue().build());
+
+ SharedBuffer payload;
+ Commands::serializeSingleMessagesToBatchPayload(payload, msgs);
+ ASSERT_EQ(payload.writableBytes(), 0);
+
+ MessageBatch messageBatch;
+ auto fakeId =
MessageIdBuilder().ledgerId(6000L).entryId(20L).partition(0).build();
+ messageBatch.withMessageId(fakeId).parseFrom(payload,
static_cast<uint32_t>(msgs.size()));
+ const std::vector<Message>& messages = messageBatch.messages();
+
+ ASSERT_EQ(messages.size(), 3);
+
+ ASSERT_TRUE(messages[0].hasNullValue());
+ ASSERT_EQ(messages[0].getPartitionKey(), "key1");
+ ASSERT_EQ(messages[0].getLength(), 0);
+
+ ASSERT_FALSE(messages[1].hasNullValue());
+ ASSERT_EQ(messages[1].getPartitionKey(), "key2");
+ ASSERT_EQ(messages[1].getDataAsString(), "content2");
+
+ ASSERT_TRUE(messages[2].hasNullValue());
+ ASSERT_EQ(messages[2].getPartitionKey(), "key3");
+ ASSERT_EQ(messages[2].getLength(), 0);
+}
+
TEST(BatchMessageTest, testSendCallback) {
const std::string topicName =
"persistent://public/default/BasicMessageTest-testSendCallback";
diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc
index 688cb33..0ffcc41 100644
--- a/tests/MessageTest.cc
+++ b/tests/MessageTest.cc
@@ -153,3 +153,46 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
auto msg = MessageBuilder().setContent("test").build();
ASSERT_TRUE(msg.getTopicName().empty());
}
+
+TEST(MessageTest, testNullValueMessage) {
+ {
+ auto msg = MessageBuilder().setContent("test").build();
+ ASSERT_FALSE(msg.hasNullValue());
+ }
+
+ {
+ auto msg =
MessageBuilder().setNullValue().setPartitionKey("key1").build();
+ ASSERT_TRUE(msg.hasNullValue());
+ ASSERT_EQ(msg.getLength(), 0);
+ ASSERT_EQ(msg.getPartitionKey(), "key1");
+ }
+
+ {
+ auto msg =
MessageBuilder().setPartitionKey("key2").setNullValue().build();
+ ASSERT_TRUE(msg.hasNullValue());
+ ASSERT_EQ(msg.getPartitionKey(), "key2");
+ }
+}
+
+TEST(MessageTest, testEmptyMessage) {
+ auto msg = MessageBuilder().build();
+ ASSERT_FALSE(msg.hasNullValue());
+ ASSERT_EQ(msg.getLength(), 0);
+}
+
+TEST(MessageTest, testEmptyStringNotNullValue) {
+ // Empty string message - has content set to ""
+ auto emptyStringMsg = MessageBuilder().setContent("").build();
+ ASSERT_FALSE(emptyStringMsg.hasNullValue());
+ ASSERT_EQ(emptyStringMsg.getLength(), 0);
+ ASSERT_EQ(emptyStringMsg.getDataAsString(), "");
+
+ // Null value message - explicitly marked as null
+ auto nullValueMsg =
MessageBuilder().setNullValue().setPartitionKey("key").build();
+ ASSERT_TRUE(nullValueMsg.hasNullValue());
+ ASSERT_EQ(nullValueMsg.getLength(), 0);
+
+ // Both have length 0, but they are semantically different
+ // Empty string: the value IS an empty string
+ // Null value: the value does not exist (tombstone for compaction)
+}
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index af833ce..302df1f 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -1045,5 +1045,120 @@ TEST(ReaderTest,
testReaderWithZeroMessageListenerThreads) {
client.close();
}
+TEST(ReaderTest, testReadCompactedWithNullValue) {
+ Client client(serviceUrl);
+
+ const std::string topicName =
+ "persistent://public/default/testReadCompactedWithNullValue-" +
std::to_string(time(nullptr));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ // Send messages with keys
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));
+
+ // Send a tombstone (null value) for key2
+ auto tombstone =
MessageBuilder().setPartitionKey("key2").setNullValue().build();
+ ASSERT_TRUE(tombstone.hasNullValue());
+ ASSERT_EQ(tombstone.getLength(), 0);
+ ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+ // Update key1 with a new value
+ ASSERT_EQ(ResultOk,
+
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));
+
+ // Create a reader with readCompacted enabled to read all messages (before
compaction runs)
+ ReaderConfiguration readerConf;
+ readerConf.setReadCompacted(true);
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(),
readerConf, reader));
+
+ // Read all messages and verify we can detect null values
+ std::map<std::string, std::string> keyValues;
+ std::set<std::string> nullValueKeys;
+ int messageCount = 0;
+
+ bool hasMessageAvailable = false;
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ while (hasMessageAvailable) {
+ Message msg;
+ ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+ messageCount++;
+
+ std::string key = msg.getPartitionKey();
+ if (msg.hasNullValue()) {
+ nullValueKeys.insert(key);
+ LOG_INFO("Received null value (tombstone) for key: " << key);
+ } else {
+ keyValues[key] = msg.getDataAsString();
+ LOG_INFO("Received message for key: " << key << ", value: " <<
msg.getDataAsString());
+ }
+
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ }
+
+ // Verify we read all 5 messages
+ ASSERT_EQ(messageCount, 5) << "Expected to read 5 messages";
+
+ // Verify the null value message was received and detected
+ ASSERT_EQ(nullValueKeys.size(), 1) << "Expected exactly one null value
message";
+ ASSERT_TRUE(nullValueKeys.count("key2") > 0) << "key2 should have a null
value (tombstone)";
+
+ // Verify key1 has the latest value (value1-updated overwrites value1)
+ ASSERT_EQ(keyValues["key1"], "value1-updated") << "key1 should have the
updated value";
+ ASSERT_EQ(keyValues["key3"], "value3") << "key3 should have value3";
+
+ producer.close();
+ reader.close();
+ client.close();
+}
+
+TEST(ReaderTest, testNullValueMessageProperties) {
+ Client client(serviceUrl);
+
+ const std::string topicName =
+ "persistent://public/default/testNullValueMessageProperties-" +
std::to_string(time(nullptr));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ // Send a null value message with properties
+ auto tombstone = MessageBuilder()
+ .setPartitionKey("user-123")
+ .setNullValue()
+ .setProperty("reason", "account-deleted")
+ .setProperty("deleted-by", "admin")
+ .build();
+
+ ASSERT_TRUE(tombstone.hasNullValue());
+ ASSERT_EQ(tombstone.getPartitionKey(), "user-123");
+ ASSERT_EQ(tombstone.getProperty("reason"), "account-deleted");
+ ASSERT_EQ(tombstone.getProperty("deleted-by"), "admin");
+ ASSERT_EQ(tombstone.getLength(), 0);
+
+ ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+ // Create a reader and verify the message
+ ReaderConfiguration readerConf;
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(),
readerConf, reader));
+
+ Message msg;
+ ASSERT_EQ(ResultOk, reader.readNext(msg, 5000));
+
+ // Verify all properties are preserved
+ ASSERT_TRUE(msg.hasNullValue());
+ ASSERT_EQ(msg.getPartitionKey(), "user-123");
+ ASSERT_EQ(msg.getProperty("reason"), "account-deleted");
+ ASSERT_EQ(msg.getProperty("deleted-by"), "admin");
+ ASSERT_EQ(msg.getLength(), 0);
+
+ producer.close();
+ reader.close();
+ client.close();
+}
+
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true,
false));
diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc
index b51b2c8..c1e648f 100644
--- a/tests/TableViewTest.cc
+++ b/tests/TableViewTest.cc
@@ -157,6 +157,93 @@ TEST(TableViewTest, testPublishEmptyValue) {
client.close();
}
+TEST(TableViewTest, testNullValueTombstone) {
+ const std::string topic = "testNullValueTombstone" +
std::to_string(time(nullptr));
+ Client client(lookupUrl);
+
+ ProducerConfiguration producerConfiguration;
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration,
producer));
+
+ // Send initial messages with keys
+ auto count = 10;
+ for (int i = 0; i < count; ++i) {
+ auto msg = MessageBuilder()
+ .setPartitionKey("key" + std::to_string(i))
+ .setContent("value" + std::to_string(i))
+ .build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ // Create table view and verify all keys are present
+ TableView tableView;
+ ASSERT_EQ(ResultOk, client.createTableView(topic, {}, tableView));
+ ASSERT_EQ(tableView.size(), count);
+
+ std::string value;
+ ASSERT_TRUE(tableView.containsKey("key5"));
+ ASSERT_TRUE(tableView.getValue("key5", value));
+ ASSERT_EQ(value, "value5");
+
+ // Send a null value (tombstone) for key5 using setNullValue()
+ auto tombstone =
MessageBuilder().setPartitionKey("key5").setNullValue().build();
+ ASSERT_TRUE(tombstone.hasNullValue());
+ ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+ // Wait for table view to process the tombstone and remove the key
+ waitUntil(
+ std::chrono::seconds(2), [&] { return !tableView.containsKey("key5");
}, 100);
+
+ // Verify key5 was removed by the tombstone
+ ASSERT_FALSE(tableView.containsKey("key5"));
+ ASSERT_EQ(tableView.size(), count - 1);
+
+ // Verify other keys are still present
+ ASSERT_TRUE(tableView.containsKey("key0"));
+ ASSERT_TRUE(tableView.containsKey("key9"));
+
+ client.close();
+}
+
+TEST(TableViewTest, testNullValueVsEmptyString) {
+ const std::string topic = "testNullValueVsEmptyString" +
std::to_string(time(nullptr));
+ Client client(lookupUrl);
+
+ ProducerConfiguration producerConfiguration;
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration,
producer));
+
+ // Send messages for two keys
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("keyA").setContent("valueA").build()));
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("keyB").setContent("valueB").build()));
+
+ TableView tableView;
+ ASSERT_EQ(ResultOk, client.createTableView(topic, {}, tableView));
+ ASSERT_EQ(tableView.size(), 2);
+
+ // Send empty string for keyA - this should also remove it from TableView
+ // (TableView treats empty payload as deletion)
+ auto emptyMsg =
MessageBuilder().setPartitionKey("keyA").setContent("").build();
+ ASSERT_FALSE(emptyMsg.hasNullValue());
+ ASSERT_EQ(ResultOk, producer.send(emptyMsg));
+
+ // Send null value (tombstone) for keyB using setNullValue()
+ auto nullMsg =
MessageBuilder().setPartitionKey("keyB").setNullValue().build();
+ ASSERT_TRUE(nullMsg.hasNullValue());
+ ASSERT_EQ(ResultOk, producer.send(nullMsg));
+
+ // Wait for both to be processed
+ waitUntil(
+ std::chrono::seconds(2), [&] { return tableView.size() == 0; }, 100);
+
+ // Both keys should be removed
+ ASSERT_FALSE(tableView.containsKey("keyA"));
+ ASSERT_FALSE(tableView.containsKey("keyB"));
+ ASSERT_EQ(tableView.size(), 0);
+
+ client.close();
+}
+
TEST(TableViewTest, testNotSupportNonPersistentTopic) {
const std::string topic = TopicDomain::NonPersistent +
"://public/default/testNotSupportNonPersistentTopic" +