Copilot commented on code in PR #563:
URL: https://github.com/apache/pulsar-client-cpp/pull/563#discussion_r3011110803


##########
tests/ReaderTest.cc:
##########
@@ -1045,5 +1045,130 @@ TEST(ReaderTest, 
testReaderWithZeroMessageListenerThreads) {
     client.close();
 }
 
+TEST(ReaderTest, testReadCompactedWithNullValue) {
+    Client client(serviceUrl);
+
+    const std::string topicName =
+        "persistent://public/default/testReadCompactedWithNullValue-" + 
std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    // Send messages with keys
+    ASSERT_EQ(ResultOk,
+              
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
+    ASSERT_EQ(ResultOk,
+              
producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
+    ASSERT_EQ(ResultOk,
+              
producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));
+
+    // Send a tombstone (null value) for key2
+    auto tombstone = 
MessageBuilder().setPartitionKey("key2").setNullValue().build();
+    ASSERT_TRUE(tombstone.hasNullValue());
+    ASSERT_EQ(tombstone.getLength(), 0);
+    ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+    // Update key1 with a new value
+    ASSERT_EQ(ResultOk,
+              
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));
+
+    // Trigger compaction via admin API
+    {
+        std::string compactUrl =
+            adminUrl + 
"admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
+            std::to_string(time(nullptr)) + "/compaction";
+        // Note: Compaction is async, we just trigger it
+        makePutRequest(compactUrl, "");
+    }
+
+    // Create a reader with readCompacted enabled
+    ReaderConfiguration readerConf;
+    readerConf.setReadCompacted(true);
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), 
readerConf, reader));
+
+    // Read all messages and verify we can detect null values
+    std::map<std::string, std::string> keyValues;
+    std::set<std::string> nullValueKeys;
+
+    for (int i = 0; i < 10; i++) {
+        bool hasMessageAvailable = false;
+        ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+        if (!hasMessageAvailable) {
+            break;
+        }
+
+        Message msg;
+        Result res = reader.readNext(msg, 3000);
+        if (res != ResultOk) {
+            break;
+        }

Review Comment:
   This read loop can silently succeed without validating behavior: it breaks 
when `hasMessageAvailable` is false or `readNext` fails, and the final 
assertion can still pass even if no messages were read at all. Prefer asserting 
`readNext` succeeds while `hasMessageAvailable==true` (or reading a known 
number of messages), and then assert on concrete outcomes (e.g., key1 == 
"value1-updated" and key2 is tombstone/absent after compaction).
   ```suggestion
           ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
   ```



##########
lib/MessageBuilder.cc:
##########
@@ -157,6 +157,12 @@ MessageBuilder& MessageBuilder::disableReplication(bool 
flag) {
     return *this;
 }
 
+MessageBuilder& MessageBuilder::setNullValue() {
+    checkMetadata();

Review Comment:
   `MessageBuilder::setNullValue()` only flips the metadata flag; it doesn't 
clear any previously-set payload, and the various `setContent(...)` overloads 
don't clear `null_value`. This allows building messages with non-empty payload 
but `null_value=true` (or vice versa), which will be interpreted incorrectly by 
consumers/table views. Make the two states mutually exclusive by clearing the 
payload/key-value when setting null value, and clearing `null_value` whenever 
content is set.
   ```suggestion
       checkMetadata();
   
       // Ensure null-value messages do not carry any payload or key-related 
metadata.
       // Clear the payload buffer.
       impl_->payload = SharedBuffer();
   
       // Clear key-related metadata so that the null value state is 
self-consistent.
       impl_->metadata.clear_partition_key();
       impl_->metadata.clear_ordering_key();
   ```



##########
tests/ReaderTest.cc:
##########
@@ -1045,5 +1045,130 @@ TEST(ReaderTest, 
testReaderWithZeroMessageListenerThreads) {
     client.close();
 }
 
+TEST(ReaderTest, testReadCompactedWithNullValue) {
+    Client client(serviceUrl);
+
+    const std::string topicName =
+        "persistent://public/default/testReadCompactedWithNullValue-" + 
std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    // Send messages with keys
+    ASSERT_EQ(ResultOk,
+              
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
+    ASSERT_EQ(ResultOk,
+              
producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
+    ASSERT_EQ(ResultOk,
+              
producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));
+
+    // Send a tombstone (null value) for key2
+    auto tombstone = 
MessageBuilder().setPartitionKey("key2").setNullValue().build();
+    ASSERT_TRUE(tombstone.hasNullValue());
+    ASSERT_EQ(tombstone.getLength(), 0);
+    ASSERT_EQ(ResultOk, producer.send(tombstone));
+
+    // Update key1 with a new value
+    ASSERT_EQ(ResultOk,
+              
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));
+
+    // Trigger compaction via admin API
+    {
+        std::string compactUrl =
+            adminUrl + 
"admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
+            std::to_string(time(nullptr)) + "/compaction";
+        // Note: Compaction is async, we just trigger it
+        makePutRequest(compactUrl, "");

Review Comment:
   The compaction admin URL is built using a fresh `time(nullptr)` value, which 
can differ from the one used to construct `topicName`. This can trigger 
compaction on a different (nonexistent) topic than the one you produced to. 
Build `compactUrl` directly from `topicName` (or extract the topic suffix once) 
and assert the returned HTTP status from `makePutRequest` (e.g., 204/409) so 
the test fails if compaction isn't triggered.
   ```suggestion
           // Build compaction URL directly from topicName to avoid mismatches
           std::string topicPath = topicName;
           std::size_t schemePos = topicPath.find("://");
           if (schemePos != std::string::npos) {
               topicPath.erase(schemePos, 3);  // remove "://", e.g., 
"persistent://public/..." -> "persistent/public/..."
           }
           std::string compactUrl = adminUrl + "admin/v2/" + topicPath + 
"/compaction";
           // Note: Compaction is async, we just trigger it, but assert that 
the request is accepted
           int res = makePutRequest(compactUrl, "");
           ASSERT_FALSE(res != 204 && res != 409);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to