Copilot commented on code in PR #556:
URL: https://github.com/apache/pulsar-client-cpp/pull/556#discussion_r2960502893


##########
tests/ReaderTest.cc:
##########
@@ -983,6 +978,26 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
     assertStartMessageId(false, secondMsgId);
 }
 
+TEST_P(ReaderSeekTest, testSeekToEndByTimestamp) {
+    auto topic = "test-seek-to-end-by-timestamp-" + 
std::to_string(time(nullptr));
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    ReaderConfiguration readerConf;
+    readerConf.setStartMessageIdInclusive(GetParam());
+
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), 
readerConf, reader));
+
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent("msg").build()));
+    auto now = TimeUtils::currentTimeMillis() + 1000;
+    ASSERT_EQ(ResultOk, reader.seek(now));
+

Review Comment:
   `testSeekToEndByTimestamp` uses `TimeUtils::currentTimeMillis() + 1000` to 
generate a timestamp “after” the last message. This can be flaky if the 
broker’s clock is ahead of the client by >1s (the seek timestamp might still 
land on/behind the last message and make `hasMessageAvailable` true). To make 
the test deterministic, use a timestamp guaranteed to be beyond any publish 
time (e.g., a much larger offset, or `std::numeric_limits<uint64_t>::max()` 
given `SeekTimestampType` is `uint64_t`).



##########
tests/ReaderTest.cc:
##########
@@ -865,13 +866,7 @@ TEST_P(ReaderSeekTest, 
testHasMessageAvailableAfterSeekToEnd) {
     }
 
     ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
-    // After seek-to-end the broker may close the consumer and trigger 
reconnect; allow a short
-    // delay for hasMessageAvailable to become false (avoids flakiness when 
reconnect completes).
-    for (int i = 0; i < 50; i++) {
-        ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
-        if (!hasMessageAvailable) break;
-        std::this_thread::sleep_for(std::chrono::milliseconds(100));
-    }
+    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));

Review Comment:
   `testHasMessageAvailableAfterSeekToEnd`: the first 
`seek(MessageId::latest())` used to retry `hasMessageAvailable` to avoid 
flakiness during broker-triggered reconnects, but the retry loop was removed 
while the second seek still keeps it. This can reintroduce intermittent 
failures where `hasMessageAvailable` is briefly true right after the seek. 
Consider restoring the same wait/retry (or using the existing `waitUntil(...)` 
helper) for the first seek as well, or otherwise making both seeks consistent 
and robust against reconnect timing.
   



##########
lib/ConsumerImpl.cc:
##########
@@ -1642,26 +1642,33 @@ void ConsumerImpl::hasMessageAvailableAsync(const 
HasMessageAvailableCallback& c
                 callback(result, {});
                 return;
             }
-            auto handleResponse = [self, response, callback] {
+            bool lastSeekIsByTimestamp = false;
+            {
+                LockGuard lock{self->mutex_};
+                if (self->lastSeekArg_.has_value() &&
+                    
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
+                    lastSeekIsByTimestamp = true;
+                }
+            }
+            auto handleResponse = [self, lastSeekIsByTimestamp, response, 
callback] {
                 if (response.hasMarkDeletePosition() && 
response.getLastMessageId().entryId() >= 0) {
                     // We only care about comparing ledger ids and entry ids 
as mark delete position
                     // doesn't have other ids such as batch index
                     auto compareResult = 
compareLedgerAndEntryId(response.getMarkDeletePosition(),
                                                                  
response.getLastMessageId());
-                    callback(ResultOk, 
self->config_.isStartMessageIdInclusive() ? compareResult <= 0
-                                                                               
  : compareResult < 0);
+                    // When the consumer has sought by timestamp that is later 
than the last message, the
+                    // mark-delete position will still be the same with the 
last message id's position. But
+                    // broker won't dispatch messages even if startMessageId 
is inclusive, so we should return
+                    // false in this case.
+                    if (lastSeekIsByTimestamp || 
!self->config_.isStartMessageIdInclusive()) {
+                        callback(ResultOk, compareResult < 0);
+                    } else {
+                        callback(ResultOk, compareResult <= 0);
+                    }

Review Comment:
   `hasMessageAvailableAsync`: `lastSeekIsByTimestamp` is read once under 
`mutex_` and then captured into `handleResponse`. If another thread performs a 
seek while the async `getLastMessageIdAsync` / optional `seekAsync` is in 
flight, this captured value can become stale and cause `hasMessageAvailable` to 
ignore `startMessageIdInclusive` (or not) incorrectly. Consider re-checking the 
current `lastSeekArg_` under lock inside `handleResponse` (or gating on a 
seek-generation/version) so the callback reflects the consumer’s current seek 
mode.



##########
lib/ConsumerImpl.cc:
##########
@@ -1642,26 +1642,33 @@ void ConsumerImpl::hasMessageAvailableAsync(const 
HasMessageAvailableCallback& c
                 callback(result, {});
                 return;
             }
-            auto handleResponse = [self, response, callback] {
+            bool lastSeekIsByTimestamp = false;
+            {
+                LockGuard lock{self->mutex_};
+                if (self->lastSeekArg_.has_value() &&
+                    
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
+                    lastSeekIsByTimestamp = true;
+                }
+            }
+            auto handleResponse = [self, lastSeekIsByTimestamp, response, 
callback] {
                 if (response.hasMarkDeletePosition() && 
response.getLastMessageId().entryId() >= 0) {
                     // We only care about comparing ledger ids and entry ids 
as mark delete position
                     // doesn't have other ids such as batch index
                     auto compareResult = 
compareLedgerAndEntryId(response.getMarkDeletePosition(),
                                                                  
response.getLastMessageId());
-                    callback(ResultOk, 
self->config_.isStartMessageIdInclusive() ? compareResult <= 0
-                                                                               
  : compareResult < 0);
+                    // When the consumer has sought by timestamp that is later 
than the last message, the
+                    // mark-delete position will still be the same with the 
last message id's position. But

Review Comment:
   Comment grammar: “the same with the last message id's position” should be 
“the same as the last message id’s position” for clarity.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to