Copilot commented on code in PR #563:
URL: https://github.com/apache/pulsar-client-cpp/pull/563#discussion_r3011110803
##########
tests/ReaderTest.cc:
##########
@@ -1045,5 +1045,130 @@ TEST(ReaderTest,
testReaderWithZeroMessageListenerThreads) {
client.close();
}
+TEST(ReaderTest, testReadCompactedWithNullValue) {
+ Client client(serviceUrl);
+
+ const std::string topicName =
+ "persistent://public/default/testReadCompactedWithNullValue-" +
std::to_string(time(nullptr));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ // Send messages with keys
+ ASSERT_EQ(ResultOk,
+
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
+ ASSERT_EQ(ResultOk,
+
producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
+ ASSERT_EQ(ResultOk,
+
producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));
+
+ // Send a tombstone (null value) for key2
+ auto tombstone =
MessageBuilder().setPartitionKey("key2").setNullValue().build();
+ ASSERT_TRUE(tombstone.hasNullValue());
+ ASSERT_EQ(tombstone.getLength(), 0);
+ ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+ // Update key1 with a new value
+ ASSERT_EQ(ResultOk,
+
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));
+
+ // Trigger compaction via admin API
+ {
+ std::string compactUrl =
+ adminUrl +
"admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
+ std::to_string(time(nullptr)) + "/compaction";
+ // Note: Compaction is async, we just trigger it
+ makePutRequest(compactUrl, "");
+ }
+
+ // Create a reader with readCompacted enabled
+ ReaderConfiguration readerConf;
+ readerConf.setReadCompacted(true);
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(),
readerConf, reader));
+
+ // Read all messages and verify we can detect null values
+ std::map<std::string, std::string> keyValues;
+ std::set<std::string> nullValueKeys;
+
+ for (int i = 0; i < 10; i++) {
+ bool hasMessageAvailable = false;
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ if (!hasMessageAvailable) {
+ break;
+ }
+
+ Message msg;
+ Result res = reader.readNext(msg, 3000);
+ if (res != ResultOk) {
+ break;
+ }
Review Comment:
This read loop can silently succeed without validating behavior: it breaks
when `hasMessageAvailable` is false or `readNext` fails, and the final
assertion can still pass even if no messages were read at all. Prefer asserting
`readNext` succeeds while `hasMessageAvailable==true` (or reading a known
number of messages), and then assert on concrete outcomes (e.g., key1 ==
"value1-updated" and key2 is tombstone/absent after compaction).
```suggestion
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
```
##########
lib/MessageBuilder.cc:
##########
@@ -157,6 +157,12 @@ MessageBuilder& MessageBuilder::disableReplication(bool
flag) {
return *this;
}
+MessageBuilder& MessageBuilder::setNullValue() {
+ checkMetadata();
Review Comment:
`MessageBuilder::setNullValue()` only flips the metadata flag; it doesn't
clear any previously-set payload, and the various `setContent(...)` overloads
don't clear `null_value`. This allows building messages with non-empty payload
but `null_value=true` (or vice versa), which will be interpreted incorrectly by
consumers/table views. Make the two states mutually exclusive by clearing the
payload/key-value when setting null value, and clearing `null_value` whenever
content is set.
```suggestion
checkMetadata();
// Ensure null-value messages do not carry any payload or key-related
metadata.
// Clear the payload buffer.
impl_->payload = SharedBuffer();
// Clear key-related metadata so that the null value state is
self-consistent.
impl_->metadata.clear_partition_key();
impl_->metadata.clear_ordering_key();
```
##########
tests/ReaderTest.cc:
##########
@@ -1045,5 +1045,130 @@ TEST(ReaderTest,
testReaderWithZeroMessageListenerThreads) {
client.close();
}
+TEST(ReaderTest, testReadCompactedWithNullValue) {
+ Client client(serviceUrl);
+
+ const std::string topicName =
+ "persistent://public/default/testReadCompactedWithNullValue-" +
std::to_string(time(nullptr));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ // Send messages with keys
+ ASSERT_EQ(ResultOk,
+
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
+ ASSERT_EQ(ResultOk,
+
producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
+ ASSERT_EQ(ResultOk,
+
producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));
+
+ // Send a tombstone (null value) for key2
+ auto tombstone =
MessageBuilder().setPartitionKey("key2").setNullValue().build();
+ ASSERT_TRUE(tombstone.hasNullValue());
+ ASSERT_EQ(tombstone.getLength(), 0);
+ ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+ // Update key1 with a new value
+ ASSERT_EQ(ResultOk,
+
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));
+
+ // Trigger compaction via admin API
+ {
+ std::string compactUrl =
+ adminUrl +
"admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
+ std::to_string(time(nullptr)) + "/compaction";
+ // Note: Compaction is async, we just trigger it
+ makePutRequest(compactUrl, "");
Review Comment:
The compaction admin URL is built using a fresh `time(nullptr)` value, which
can differ from the one used to construct `topicName`. This can trigger
compaction on a different (nonexistent) topic than the one you produced to.
Build `compactUrl` directly from `topicName` (or extract the topic suffix once)
and assert the returned HTTP status from `makePutRequest` (e.g., 204/409) so
the test fails if compaction isn't triggered.
```suggestion
// Build compaction URL directly from topicName to avoid mismatches
std::string topicPath = topicName;
std::size_t schemePos = topicPath.find("://");
if (schemePos != std::string::npos) {
topicPath.erase(schemePos, 3); // remove "://", e.g.,
"persistent://public/..." -> "persistent/public/..."
}
std::string compactUrl = adminUrl + "admin/v2/" + topicPath +
"/compaction";
// Note: Compaction is async, we just trigger it, but assert that
the request is accepted
int res = makePutRequest(compactUrl, "");
ASSERT_FALSE(res != 204 && res != 409);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]