Copilot commented on code in PR #556:
URL: https://github.com/apache/pulsar-client-cpp/pull/556#discussion_r2960502893
##########
tests/ReaderTest.cc:
##########
@@ -983,6 +978,26 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
assertStartMessageId(false, secondMsgId);
}
+TEST_P(ReaderSeekTest, testSeekToEndByTimestamp) {
+ auto topic = "test-seek-to-end-by-timestamp-" +
std::to_string(time(nullptr));
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+ ReaderConfiguration readerConf;
+ readerConf.setStartMessageIdInclusive(GetParam());
+
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(),
readerConf, reader));
+
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("msg").build()));
+ auto now = TimeUtils::currentTimeMillis() + 1000;
+ ASSERT_EQ(ResultOk, reader.seek(now));
+
Review Comment:
`testSeekToEndByTimestamp` uses `TimeUtils::currentTimeMillis() + 1000` to
generate a timestamp “after” the last message. This can be flaky if the
broker’s clock is ahead of the client by >1s (the seek timestamp might still
land on/behind the last message and make `hasMessageAvailable` true). To make
the test deterministic, use a timestamp guaranteed to be beyond any publish
time (e.g., a much larger offset, or `std::numeric_limits<uint64_t>::max()`
given `SeekTimestampType` is `uint64_t`).
##########
tests/ReaderTest.cc:
##########
@@ -865,13 +866,7 @@ TEST_P(ReaderSeekTest,
testHasMessageAvailableAfterSeekToEnd) {
}
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
- // After seek-to-end the broker may close the consumer and trigger
reconnect; allow a short
- // delay for hasMessageAvailable to become false (avoids flakiness when
reconnect completes).
- for (int i = 0; i < 50; i++) {
- ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
- if (!hasMessageAvailable) break;
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
Review Comment:
`testHasMessageAvailableAfterSeekToEnd`: the first
`seek(MessageId::latest())` used to retry `hasMessageAvailable` to avoid
flakiness during broker-triggered reconnects, but the retry loop was removed
while the second seek still keeps it. This can reintroduce intermittent
failures where `hasMessageAvailable` is briefly true right after the seek.
Consider restoring the same wait/retry (or using the existing `waitUntil(...)`
helper) for the first seek as well, or otherwise making both seeks consistent
and robust against reconnect timing.
##########
lib/ConsumerImpl.cc:
##########
@@ -1642,26 +1642,33 @@ void ConsumerImpl::hasMessageAvailableAsync(const
HasMessageAvailableCallback& c
callback(result, {});
return;
}
- auto handleResponse = [self, response, callback] {
+ bool lastSeekIsByTimestamp = false;
+ {
+ LockGuard lock{self->mutex_};
+ if (self->lastSeekArg_.has_value() &&
+
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
+ lastSeekIsByTimestamp = true;
+ }
+ }
+ auto handleResponse = [self, lastSeekIsByTimestamp, response,
callback] {
if (response.hasMarkDeletePosition() &&
response.getLastMessageId().entryId() >= 0) {
// We only care about comparing ledger ids and entry ids
as mark delete position
// doesn't have other ids such as batch index
auto compareResult =
compareLedgerAndEntryId(response.getMarkDeletePosition(),
response.getLastMessageId());
- callback(ResultOk,
self->config_.isStartMessageIdInclusive() ? compareResult <= 0
-
: compareResult < 0);
+ // When the consumer has sought by timestamp that is later
than the last message, the
+ // mark-delete position will still be the same with the
last message id's position. But
+ // broker won't dispatch messages even if startMessageId
is inclusive, so we should return
+ // false in this case.
+ if (lastSeekIsByTimestamp ||
!self->config_.isStartMessageIdInclusive()) {
+ callback(ResultOk, compareResult < 0);
+ } else {
+ callback(ResultOk, compareResult <= 0);
+ }
Review Comment:
`hasMessageAvailableAsync`: `lastSeekIsByTimestamp` is read once under
`mutex_` and then captured into `handleResponse`. If another thread performs a
seek while the async `getLastMessageIdAsync` / optional `seekAsync` is in
flight, this captured value can become stale and cause `hasMessageAvailable` to
ignore `startMessageIdInclusive` (or not) incorrectly. Consider re-checking the
current `lastSeekArg_` under lock inside `handleResponse` (or gating on a
seek-generation/version) so the callback reflects the consumer’s current seek
mode.
##########
lib/ConsumerImpl.cc:
##########
@@ -1642,26 +1642,33 @@ void ConsumerImpl::hasMessageAvailableAsync(const
HasMessageAvailableCallback& c
callback(result, {});
return;
}
- auto handleResponse = [self, response, callback] {
+ bool lastSeekIsByTimestamp = false;
+ {
+ LockGuard lock{self->mutex_};
+ if (self->lastSeekArg_.has_value() &&
+
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
+ lastSeekIsByTimestamp = true;
+ }
+ }
+ auto handleResponse = [self, lastSeekIsByTimestamp, response,
callback] {
if (response.hasMarkDeletePosition() &&
response.getLastMessageId().entryId() >= 0) {
// We only care about comparing ledger ids and entry ids
as mark delete position
// doesn't have other ids such as batch index
auto compareResult =
compareLedgerAndEntryId(response.getMarkDeletePosition(),
response.getLastMessageId());
- callback(ResultOk,
self->config_.isStartMessageIdInclusive() ? compareResult <= 0
-
: compareResult < 0);
+ // When the consumer has sought by timestamp that is later
than the last message, the
+ // mark-delete position will still be the same with the
last message id's position. But
Review Comment:
Comment grammar: “the same with the last message id's position” should be
“the same as the last message id’s position” for clarity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]