BewareMyPower commented on code in PR #556:
URL: https://github.com/apache/pulsar-client-cpp/pull/556#discussion_r2963571080


##########
lib/ConsumerImpl.cc:
##########
@@ -1642,26 +1642,33 @@ void ConsumerImpl::hasMessageAvailableAsync(const 
HasMessageAvailableCallback& c
                 callback(result, {});
                 return;
             }
-            auto handleResponse = [self, response, callback] {
+            bool lastSeekIsByTimestamp = false;
+            {
+                LockGuard lock{self->mutex_};
+                if (self->lastSeekArg_.has_value() &&
+                    
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
+                    lastSeekIsByTimestamp = true;
+                }
+            }
+            auto handleResponse = [self, lastSeekIsByTimestamp, response, 
callback] {
                 if (response.hasMarkDeletePosition() && 
response.getLastMessageId().entryId() >= 0) {
                     // We only care about comparing ledger ids and entry ids 
as mark delete position
                     // doesn't have other ids such as batch index
                     auto compareResult = 
compareLedgerAndEntryId(response.getMarkDeletePosition(),
                                                                  
response.getLastMessageId());
-                    callback(ResultOk, 
self->config_.isStartMessageIdInclusive() ? compareResult <= 0
-                                                                               
  : compareResult < 0);
+                    // When the consumer has sought by timestamp that is later 
than the last message, the
+                    // mark-delete position will still be the same with the 
last message id's position. But
+                    // broker won't dispatch messages even if startMessageId 
is inclusive, so we should return
+                    // false in this case.
+                    if (lastSeekIsByTimestamp || 
!self->config_.isStartMessageIdInclusive()) {
+                        callback(ResultOk, compareResult < 0);
+                    } else {
+                        callback(ResultOk, compareResult <= 0);
+                    }

Review Comment:
   We don't need to handle the case for concurrent `seek` and 
`hasMessageAvailable` operations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to