This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 64fbda0 Fix connection leak by request timers not cancelled in time
(#555)
64fbda0 is described below
commit 64fbda03c8de318d20a782e7016bd02779110671
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Mar 19 16:01:21 2026 +0800
Fix connection leak by request timers not cancelled in time (#555)
* Fix connection leak by request timers not cancelled in time
* fix
* fix include style
* revert consumer stats changes and speed up tests
* abstract a common method to insert a request and add tests
* remove duplicated emplace
* remove unexpected error log
---
lib/ClientConnection.cc | 235 +++++++++++++++++++++---------------------------
lib/ClientConnection.h | 101 ++++++++-------------
lib/ExecutorService.h | 9 ++
lib/MockServer.h | 20 +++++
lib/PendingRequest.h | 76 ++++++++++++++++
tests/ClientTest.cc | 97 ++++++++++++++++++++
tests/PulsarFriend.h | 35 ++++++++
7 files changed, 376 insertions(+), 197 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index cc7e1f6..a135ade 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -33,6 +33,7 @@
#include "ConnectionPool.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
+#include "Future.h"
#include "LogUtils.h"
#include "MockServer.h"
#include "OpSendMsg.h"
@@ -1029,8 +1030,6 @@ void ClientConnection::newPartitionedMetadataLookup(const
std::string& topicName
void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
const char* requestType,
const LookupDataResultPromisePtr& promise) {
Lock lock(mutex_);
- std::shared_ptr<LookupDataResultPtr> lookupDataResult;
- lookupDataResult = std::make_shared<LookupDataResultPtr>();
if (isClosed()) {
lock.unlock();
promise->setFailed(ResultNotConnected);
@@ -1040,18 +1039,30 @@ void ClientConnection::newLookup(const SharedBuffer&
cmd, uint64_t requestId, co
promise->setFailed(ResultTooManyLookupRequestException);
return;
}
- LookupRequestData requestData;
- requestData.promise = promise;
- requestData.timer = executor_->createDeadlineTimer();
- requestData.timer->expires_after(operationsTimeout_);
- requestData.timer->async_wait([this, self{shared_from_this()},
requestData](const ASIO_ERROR& ec) {
- handleLookupTimeout(ec, requestData);
+
+ auto request = insertRequest(
+ pendingLookupRequests_, requestId, [weakSelf{weak_from_this()},
requestId, requestType]() {
+ if (auto self = weakSelf.lock()) {
+ LOG_WARN(self->cnxString()
+ << requestType << " request timeout to broker,
req_id: " << requestId);
+ self->numOfPendingLookupRequest_--;
+ }
+ });
+ request->getFuture().addListener([promise](Result result, const
LookupDataResultPtr& lookupDataResult) {
+ if (result == ResultOk) {
+ promise->setValue(lookupDataResult);
+ } else {
+ promise->setFailed(result);
+ }
});
- pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
numOfPendingLookupRequest_++;
lock.unlock();
LOG_DEBUG(cnxString() << "Inserted lookup request " << requestType << "
(req_id: " << requestId << ")");
+ if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ !=
nullptr &&
+ mockServer_->sendRequest(requestType, requestId)) {
+ return;
+ }
sendCommand(cmd);
}
@@ -1159,21 +1170,19 @@ Future<Result, ResponseData>
ClientConnection::sendRequestWithId(const SharedBuf
if (isClosed()) {
lock.unlock();
- Promise<Result, ResponseData> promise;
LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " <<
requestId
<< ") to a closed connection");
+ Promise<Result, ResponseData> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
- PendingRequestData requestData;
- requestData.timer = executor_->createDeadlineTimer();
- requestData.timer->expires_after(operationsTimeout_);
- requestData.timer->async_wait([this, self{shared_from_this()},
requestData](const ASIO_ERROR& ec) {
- handleRequestTimeout(ec, requestData);
- });
-
- pendingRequests_.insert(std::make_pair(requestId, requestData));
+ auto request = insertRequest(
+ pendingRequests_, requestId,
+ [cnxString{cnxString()}, physicalAddress{physicalAddress_}, requestId,
requestType]() {
+ LOG_WARN(cnxString << "Network request timeout to broker, remote:
" << physicalAddress
+ << ", req_id: " << requestId << ", request: "
<< requestType);
+ });
lock.unlock();
LOG_DEBUG(cnxString() << "Inserted request " << requestType << " (req_id:
" << requestId << ")");
@@ -1187,31 +1196,7 @@ Future<Result, ResponseData>
ClientConnection::sendRequestWithId(const SharedBuf
} else {
sendCommand(cmd);
}
- return requestData.promise.getFuture();
-}
-
-void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec,
- const PendingRequestData&
pendingRequestData) {
- if (!ec && !pendingRequestData.hasGotResponse->load()) {
- LOG_WARN(cnxString() << "Network request timeout to broker, remote: "
<< physicalAddress_);
- pendingRequestData.promise.setFailed(ResultTimeout);
- }
-}
-
-void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec,
- const LookupRequestData&
pendingRequestData) {
- if (!ec) {
- LOG_WARN(cnxString() << "Lookup request timeout to broker, remote: "
<< physicalAddress_);
- pendingRequestData.promise->setFailed(ResultTimeout);
- }
-}
-
-void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
- const
ClientConnection::LastMessageIdRequestData& data) {
- if (!ec) {
- LOG_WARN(cnxString() << "GetLastMessageId request timeout to broker,
remote: " << physicalAddress_);
- data.promise->setFailed(ResultTimeout);
- }
+ return request->getFuture();
}
void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) {
@@ -1294,7 +1279,7 @@ const std::future<void>& ClientConnection::close(Result
result, bool switchClust
cancelTimer(*connectTimer_);
lock.unlock();
int refCount = weak_from_this().use_count();
- if (!isResultRetryable(result)) {
+ if (result != ResultAlreadyClosed /* closed by the pool */ &&
!isResultRetryable(result)) {
LOG_ERROR(cnxString() << "Connection closed with " << result << "
(refCnt: " << refCount << ")");
} else {
LOG_INFO(cnxString() << "Connection disconnected (refCnt: " <<
refCount << ")");
@@ -1344,25 +1329,25 @@ const std::future<void>& ClientConnection::close(Result
result, bool switchClust
connectPromise_.setFailed(result);
- // Fail all pending requests, all these type are map whose value type
contains the Promise object
+ // Fail all pending requests after releasing the lock.
for (auto& kv : pendingRequests) {
- kv.second.fail(result);
+ kv.second->fail(result);
}
for (auto& kv : pendingLookupRequests) {
- kv.second.fail(result);
+ kv.second->fail(result);
}
for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString() << " Closing Client Connection, please try again
later");
kv.second.setFailed(result);
}
for (auto& kv : pendingGetLastMessageIdRequests) {
- kv.second.fail(result);
+ kv.second->fail(result);
}
for (auto& kv : pendingGetNamespaceTopicsRequests) {
- kv.second.setFailed(result);
+ kv.second->fail(result);
}
for (auto& kv : pendingGetSchemaRequests) {
- kv.second.fail(result);
+ kv.second->fail(result);
}
return *closeFuture_;
}
@@ -1406,77 +1391,76 @@ Commands::ChecksumType
ClientConnection::getChecksumType() const {
Future<Result, GetLastMessageIdResponse>
ClientConnection::newGetLastMessageId(uint64_t consumerId,
uint64_t requestId) {
Lock lock(mutex_);
- auto promise =
std::make_shared<GetLastMessageIdResponsePromisePtr::element_type>();
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString() << " Client is not connected to the broker");
+ auto promise =
std::make_shared<GetLastMessageIdResponsePromisePtr::element_type>();
promise->setFailed(ResultNotConnected);
return promise->getFuture();
}
- LastMessageIdRequestData requestData;
- requestData.promise = promise;
- requestData.timer = executor_->createDeadlineTimer();
- requestData.timer->expires_after(operationsTimeout_);
- requestData.timer->async_wait([this, self{shared_from_this()},
requestData](const ASIO_ERROR& ec) {
- handleGetLastMessageIdTimeout(ec, requestData);
- });
- pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId,
requestData));
+ auto request =
+ insertRequest(pendingGetLastMessageIdRequests_, requestId, [cnxString
= cnxString(), requestId]() {
+ LOG_WARN(cnxString << "GetLastMessageId request timeout to broker,
req_id: " << requestId);
+ });
lock.unlock();
+
+ if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ !=
nullptr &&
+ mockServer_->sendRequest("GET_LAST_MESSAGE_ID", requestId)) {
+ return request->getFuture();
+ }
sendCommand(Commands::newGetLastMessageId(consumerId, requestId));
- return promise->getFuture();
+ return request->getFuture();
}
Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t
requestId) {
Lock lock(mutex_);
- Promise<Result, NamespaceTopicsPtr> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString() << "Client is not connected to the broker");
+ Promise<Result, NamespaceTopicsPtr> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
- pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId,
promise));
+ auto request =
+ insertRequest(pendingGetNamespaceTopicsRequests_, requestId,
[cnxString = cnxString(), requestId]() {
+ LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to
broker, req_id: " << requestId);
+ });
lock.unlock();
+ if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ !=
nullptr &&
+ mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) {
+ return request->getFuture();
+ }
sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId));
- return promise.getFuture();
+ return request->getFuture();
}
Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string&
topicName,
const std::string&
version, uint64_t requestId) {
Lock lock(mutex_);
- Promise<Result, SchemaInfo> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString() << "Client is not connected to the broker");
+ Promise<Result, SchemaInfo> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
- auto timer = executor_->createDeadlineTimer();
- pendingGetSchemaRequests_.emplace(requestId, GetSchemaRequest{promise,
timer});
+ auto request =
+ insertRequest(pendingGetSchemaRequests_, requestId, [cnxString =
cnxString(), requestId]() {
+ LOG_WARN(cnxString << "GetSchema request timeout to broker,
req_id: " << requestId);
+ });
lock.unlock();
- timer->expires_after(operationsTimeout_);
- timer->async_wait([this, self{shared_from_this()}, requestId](const
ASIO_ERROR& ec) {
- if (ec) {
- return;
- }
- Lock lock(mutex_);
- auto it = pendingGetSchemaRequests_.find(requestId);
- if (it != pendingGetSchemaRequests_.end()) {
- auto promise = std::move(it->second.promise);
- pendingGetSchemaRequests_.erase(it);
- lock.unlock();
- promise.setFailed(ResultTimeout);
- }
- });
-
+ if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ !=
nullptr &&
+ mockServer_->sendRequest("GET_SCHEMA", requestId)) {
+ return request->getFuture();
+ }
sendCommand(Commands::newGetSchema(topicName, version, requestId));
- return promise.getFuture();
+ return request->getFuture();
}
void ClientConnection::checkServerError(ServerError error, const std::string&
message) {
@@ -1541,12 +1525,11 @@ void ClientConnection::handleSuccess(const
proto::CommandSuccess& success) {
Lock lock(mutex_);
auto it = pendingRequests_.find(success.request_id());
if (it != pendingRequests_.end()) {
- PendingRequestData requestData = it->second;
+ auto request = std::move(it->second);
pendingRequests_.erase(it);
lock.unlock();
- requestData.promise.setValue({});
- cancelTimer(*requestData.timer);
+ request->complete({});
}
}
@@ -1558,9 +1541,7 @@ void ClientConnection::handlePartitionedMetadataResponse(
Lock lock(mutex_);
auto it =
pendingLookupRequests_.find(partitionMetadataResponse.request_id());
if (it != pendingLookupRequests_.end()) {
- cancelTimer(*it->second.timer);
-
- LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
+ auto request = std::move(it->second);
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock();
@@ -1574,17 +1555,17 @@ void
ClientConnection::handlePartitionedMetadataResponse(
<< " error: " <<
partitionMetadataResponse.error()
<< " msg: " <<
partitionMetadataResponse.message());
checkServerError(partitionMetadataResponse.error(),
partitionMetadataResponse.message());
- lookupDataPromise->setFailed(
+ request->fail(
getResult(partitionMetadataResponse.error(),
partitionMetadataResponse.message()));
} else {
LOG_ERROR(cnxString() << "Failed partition-metadata lookup
req_id: "
<<
partitionMetadataResponse.request_id() << " with empty response: ");
- lookupDataPromise->setFailed(ResultConnectError);
+ request->fail(ResultConnectError);
}
} else {
LookupDataResultPtr lookupResultPtr =
std::make_shared<LookupDataResult>();
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
- lookupDataPromise->setValue(lookupResultPtr);
+ request->complete(lookupResultPtr);
}
} else {
@@ -1600,7 +1581,7 @@ void ClientConnection::handleConsumerStatsResponse(
Lock lock(mutex_);
auto it =
pendingConsumerStatsMap_.find(consumerStatsResponse.request_id());
if (it != pendingConsumerStatsMap_.end()) {
- Promise<Result, BrokerConsumerStatsImpl> consumerStatsPromise =
it->second;
+ auto request = std::move(it->second);
pendingConsumerStatsMap_.erase(it);
lock.unlock();
@@ -1609,7 +1590,7 @@ void ClientConnection::handleConsumerStatsResponse(
LOG_ERROR(cnxString()
<< " Failed to get consumer stats - " <<
consumerStatsResponse.error_message());
}
- consumerStatsPromise.setFailed(
+ request.setFailed(
getResult(consumerStatsResponse.error_code(),
consumerStatsResponse.error_message()));
} else {
LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received
consumer stats "
@@ -1622,7 +1603,7 @@ void ClientConnection::handleConsumerStatsResponse(
consumerStatsResponse.blockedconsumeronunackedmsgs(),
consumerStatsResponse.address(),
consumerStatsResponse.connectedsince(),
consumerStatsResponse.type(),
consumerStatsResponse.msgrateexpired(),
consumerStatsResponse.msgbacklog());
- consumerStatsPromise.setValue(brokerStats);
+ request.setValue(brokerStats);
}
} else {
LOG_WARN("ConsumerStatsResponse command - Received unknown request id
from server: "
@@ -1635,8 +1616,7 @@ void ClientConnection::handleLookupTopicRespose(
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
- cancelTimer(*it->second.timer);
- LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
+ auto request = std::move(it->second);
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock();
@@ -1648,12 +1628,11 @@ void ClientConnection::handleLookupTopicRespose(
<< " error: " <<
lookupTopicResponse.error()
<< " msg: " <<
lookupTopicResponse.message());
checkServerError(lookupTopicResponse.error(),
lookupTopicResponse.message());
- lookupDataPromise->setFailed(
- getResult(lookupTopicResponse.error(),
lookupTopicResponse.message()));
+ request->fail(getResult(lookupTopicResponse.error(),
lookupTopicResponse.message()));
} else {
LOG_ERROR(cnxString() << "Failed lookup req_id: " <<
lookupTopicResponse.request_id()
<< " with empty response: ");
- lookupDataPromise->setFailed(ResultConnectError);
+ request->fail(ResultConnectError);
}
} else {
LOG_DEBUG(cnxString() << "Received lookup response from server.
req_id: "
@@ -1676,7 +1655,7 @@ void ClientConnection::handleLookupTopicRespose(
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
proto::CommandLookupTopicResponse::Redirect);
lookupResultPtr->setShouldProxyThroughServiceUrl(lookupTopicResponse.proxy_through_service_url());
- lookupDataPromise->setValue(lookupResultPtr);
+ request->complete(lookupResultPtr);
}
} else {
@@ -1692,12 +1671,12 @@ void ClientConnection::handleProducerSuccess(const
proto::CommandProducerSuccess
Lock lock(mutex_);
auto it = pendingRequests_.find(producerSuccess.request_id());
if (it != pendingRequests_.end()) {
- PendingRequestData requestData = it->second;
+ auto request = it->second;
if (!producerSuccess.producer_ready()) {
LOG_INFO(cnxString() << " Producer " <<
producerSuccess.producer_name()
<< " has been queued up at broker. req_id: "
<< producerSuccess.request_id());
- requestData.hasGotResponse->store(true);
+ request->disableTimeout();
lock.unlock();
} else {
pendingRequests_.erase(it);
@@ -1713,8 +1692,7 @@ void ClientConnection::handleProducerSuccess(const
proto::CommandProducerSuccess
} else {
data.topicEpoch = std::nullopt;
}
- requestData.promise.setValue(data);
- cancelTimer(*requestData.timer);
+ request->complete(data);
}
}
}
@@ -1729,30 +1707,27 @@ void ClientConnection::handleError(const
proto::CommandError& error) {
auto it = pendingRequests_.find(error.request_id());
if (it != pendingRequests_.end()) {
- PendingRequestData requestData = it->second;
+ auto request = std::move(it->second);
pendingRequests_.erase(it);
lock.unlock();
- requestData.promise.setFailed(result);
- cancelTimer(*requestData.timer);
+ request->fail(result);
} else {
- PendingGetLastMessageIdRequestsMap::iterator it =
- pendingGetLastMessageIdRequests_.find(error.request_id());
+ auto it = pendingGetLastMessageIdRequests_.find(error.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
- auto getLastMessageIdPromise = it->second.promise;
+ auto request = std::move(it->second);
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
- getLastMessageIdPromise->setFailed(result);
+ request->fail(result);
} else {
- PendingGetNamespaceTopicsMap::iterator it =
- pendingGetNamespaceTopicsRequests_.find(error.request_id());
+ auto it =
pendingGetNamespaceTopicsRequests_.find(error.request_id());
if (it != pendingGetNamespaceTopicsRequests_.end()) {
- Promise<Result, NamespaceTopicsPtr> getNamespaceTopicsPromise
= it->second;
+ auto request = std::move(it->second);
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();
- getNamespaceTopicsPromise.setFailed(result);
+ request->fail(result);
} else {
lock.unlock();
}
@@ -1904,16 +1879,15 @@ void ClientConnection::handleGetLastMessageIdResponse(
auto it =
pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
- auto getLastMessageIdPromise = it->second.promise;
+ auto request = std::move(it->second);
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
if (getLastMessageIdResponse.has_consumer_mark_delete_position()) {
- getLastMessageIdPromise->setValue(
- {toMessageId(getLastMessageIdResponse.last_message_id()),
-
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
+
request->complete({toMessageId(getLastMessageIdResponse.last_message_id()),
+
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
} else {
-
getLastMessageIdPromise->setValue({toMessageId(getLastMessageIdResponse.last_message_id())});
+
request->complete({toMessageId(getLastMessageIdResponse.last_message_id())});
}
} else {
lock.unlock();
@@ -1931,7 +1905,7 @@ void ClientConnection::handleGetTopicOfNamespaceResponse(
auto it = pendingGetNamespaceTopicsRequests_.find(response.request_id());
if (it != pendingGetNamespaceTopicsRequests_.end()) {
- Promise<Result, NamespaceTopicsPtr> getTopicsPromise = it->second;
+ auto request = std::move(it->second);
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();
@@ -1953,7 +1927,7 @@ void ClientConnection::handleGetTopicOfNamespaceResponse(
NamespaceTopicsPtr topicsPtr =
std::make_shared<std::vector<std::string>>(topicSet.begin(),
topicSet.end());
- getTopicsPromise.setValue(topicsPtr);
+ request->complete(topicsPtr);
} else {
lock.unlock();
LOG_WARN(
@@ -1968,7 +1942,7 @@ void ClientConnection::handleGetSchemaResponse(const
proto::CommandGetSchemaResp
Lock lock(mutex_);
auto it = pendingGetSchemaRequests_.find(response.request_id());
if (it != pendingGetSchemaRequests_.end()) {
- Promise<Result, SchemaInfo> getSchemaPromise = it->second.promise;
+ auto request = std::move(it->second);
pendingGetSchemaRequests_.erase(it);
lock.unlock();
@@ -1981,7 +1955,7 @@ void ClientConnection::handleGetSchemaResponse(const
proto::CommandGetSchemaResp
: "")
<< " -- req_id: " <<
response.request_id());
}
- getSchemaPromise.setFailed(result);
+ request->fail(result);
return;
}
@@ -1992,7 +1966,7 @@ void ClientConnection::handleGetSchemaResponse(const
proto::CommandGetSchemaResp
properties[kv->key()] = kv->value();
}
SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "",
schema.schema_data(), properties);
- getSchemaPromise.setValue(schemaInfo);
+ request->complete(schemaInfo);
} else {
lock.unlock();
LOG_WARN(
@@ -2013,24 +1987,23 @@ void ClientConnection::handleAckResponse(const
proto::CommandAckResponse& respon
return;
}
- auto promise = it->second.promise;
+ auto request = std::move(it->second);
pendingRequests_.erase(it);
lock.unlock();
if (response.has_error()) {
- promise.setFailed(getResult(response.error(), ""));
+ request->fail(getResult(response.error(), ""));
} else {
- promise.setValue({});
+ request->complete({});
}
}
void ClientConnection::unsafeRemovePendingRequest(long requestId) {
auto it = pendingRequests_.find(requestId);
if (it != pendingRequests_.end()) {
- it->second.promise.setFailed(ResultDisconnected);
- cancelTimer(*it->second.timer);
-
+ auto request = std::move(it->second);
pendingRequests_.erase(it);
+ request->fail(ResultDisconnected);
}
}
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 75e4bca..c8cd86f 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -28,6 +28,7 @@
#include <cstdint>
#include <future>
#include <optional>
+
#ifdef USE_ASIO
#include <asio/bind_executor.hpp>
#include <asio/io_context.hpp>
@@ -52,8 +53,10 @@
#include "AsioTimer.h"
#include "Commands.h"
+#include "ExecutorService.h"
#include "GetLastMessageIdResponse.h"
#include "LookupDataResult.h"
+#include "PendingRequest.h"
#include "SharedBuffer.h"
#include "TimeUtils.h"
#include "UtilAllocator.h"
@@ -66,9 +69,6 @@ class PulsarFriend;
using TcpResolverPtr = std::shared_ptr<ASIO::ip::tcp::resolver>;
-class ExecutorService;
-using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
-
class ConnectionPool;
class ClientConnection;
typedef std::shared_ptr<ClientConnection> ClientConnectionPtr;
@@ -225,47 +225,6 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void handleKeepAliveTimeout(const ASIO_ERROR& ec);
private:
- struct PendingRequestData {
- Promise<Result, ResponseData> promise;
- DeadlineTimerPtr timer;
- std::shared_ptr<std::atomic_bool>
hasGotResponse{std::make_shared<std::atomic_bool>(false)};
-
- void fail(Result result) {
- cancelTimer(*timer);
- promise.setFailed(result);
- }
- };
-
- struct LookupRequestData {
- LookupDataResultPromisePtr promise;
- DeadlineTimerPtr timer;
-
- void fail(Result result) {
- cancelTimer(*timer);
- promise->setFailed(result);
- }
- };
-
- struct LastMessageIdRequestData {
- GetLastMessageIdResponsePromisePtr promise;
- DeadlineTimerPtr timer;
-
- void fail(Result result) {
- cancelTimer(*timer);
- promise->setFailed(result);
- }
- };
-
- struct GetSchemaRequest {
- Promise<Result, SchemaInfo> promise;
- DeadlineTimerPtr timer;
-
- void fail(Result result) {
- cancelTimer(*timer);
- promise.setFailed(result);
- }
- };
-
/*
* handler for connectAsync
* creates a ConnectionPtr which has a valid ClientConnection object
@@ -303,12 +262,6 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void newLookup(const SharedBuffer& cmd, uint64_t requestId, const char*
requestType,
const LookupDataResultPromisePtr& promise);
- void handleRequestTimeout(const ASIO_ERROR& ec, const PendingRequestData&
pendingRequestData);
-
- void handleLookupTimeout(const ASIO_ERROR&, const LookupRequestData&);
-
- void handleGetLastMessageIdTimeout(const ASIO_ERROR&, const
LastMessageIdRequestData& data);
-
template <typename Handler>
inline AllocHandler<Handler> customAllocReadHandler(Handler h) {
return AllocHandler<Handler>(readHandlerAllocator_, h);
@@ -385,33 +338,49 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
const std::chrono::milliseconds connectTimeout_;
const DeadlineTimerPtr connectTimer_;
- typedef std::map<long, PendingRequestData> PendingRequestsMap;
- PendingRequestsMap pendingRequests_;
+ template <typename T>
+ using RequestMap = std::unordered_map<uint64_t, PendingRequestPtr<T>>;
- typedef std::map<long, LookupRequestData> PendingLookupRequestsMap;
- PendingLookupRequestsMap pendingLookupRequests_;
+ RequestMap<ResponseData> pendingRequests_;
+ RequestMap<LookupDataResultPtr> pendingLookupRequests_;
+ RequestMap<GetLastMessageIdResponse> pendingGetLastMessageIdRequests_;
+ RequestMap<NamespaceTopicsPtr> pendingGetNamespaceTopicsRequests_;
+ RequestMap<SchemaInfo> pendingGetSchemaRequests_;
- typedef std::map<long, ProducerImplWeakPtr> ProducersMap;
+ typedef std::unordered_map<long, ProducerImplWeakPtr> ProducersMap;
ProducersMap producers_;
- typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap;
+ typedef std::unordered_map<long, ConsumerImplWeakPtr> ConsumersMap;
ConsumersMap consumers_;
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>>
PendingConsumerStatsMap;
PendingConsumerStatsMap pendingConsumerStatsMap_;
- typedef std::map<long, LastMessageIdRequestData>
PendingGetLastMessageIdRequestsMap;
- PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;
-
- typedef std::map<long, Promise<Result, NamespaceTopicsPtr>>
PendingGetNamespaceTopicsMap;
- PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
-
- typedef std::unordered_map<uint64_t, GetSchemaRequest> PendingGetSchemaMap;
- PendingGetSchemaMap pendingGetSchemaRequests_;
-
mutable std::mutex mutex_;
typedef std::unique_lock<std::mutex> Lock;
+ // Note: this method must be called when holding `mutex_`
+ template <typename T, typename OnTimeout>
+ auto insertRequest(RequestMap<T>& pendingRequests, uint64_t requestId,
OnTimeout onTimeout) {
+ auto request = std::make_shared<PendingRequest<T>>(
+ executor_->createTimer(operationsTimeout_),
+ [this, self{shared_from_this()}, requestId,
onTimeout{std::move(onTimeout)},
+ &pendingRequests]() mutable {
+ {
+ std::lock_guard lock{mutex_};
+ if (auto it = pendingRequests.find(requestId); it !=
pendingRequests.end()) {
+ pendingRequests.erase(it);
+ }
+ }
+ onTimeout();
+ });
+ auto [iterator, inserted] = pendingRequests.emplace(requestId,
request);
+ if (inserted) {
+ request->initialize();
+ } // else: the request id is duplicated
+ return iterator->second;
+ }
+
// Pending buffers to write on the socket
std::deque<std::any> pendingWriteBuffers_;
int pendingWriteOperations_ = 0;
@@ -435,7 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
uint32_t maxPendingLookupRequest_;
- uint32_t numOfPendingLookupRequest_ = 0;
+ std::atomic_uint32_t numOfPendingLookupRequest_{0};
bool isTlsAllowInsecureConnection_ = false;
diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h
index 80659d4..4a36396 100644
--- a/lib/ExecutorService.h
+++ b/lib/ExecutorService.h
@@ -28,12 +28,14 @@
#include <asio/ip/tcp.hpp>
#include <asio/post.hpp>
#include <asio/ssl.hpp>
+#include <asio/steady_timer.hpp>
#else
#include <boost/asio/dispatch.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/ssl.hpp>
+#include <boost/asio/steady_timer.hpp>
#endif
#include <chrono>
#include <condition_variable>
@@ -68,6 +70,13 @@ class PULSAR_PUBLIC ExecutorService : public
std::enable_shared_from_this<Execut
// throws std::runtime_error if failed
DeadlineTimerPtr createDeadlineTimer();
+ template <typename Duration>
+ ASIO::steady_timer createTimer(const Duration &duration) {
+ auto timer = ASIO::steady_timer(io_context_);
+ timer.expires_after(duration);
+ return timer;
+ }
+
// Execute the task in the event loop thread asynchronously, i.e. the task
will be put in the event loop
// queue and executed later.
template <typename T>
diff --git a/lib/MockServer.h b/lib/MockServer.h
index 2d830fc..6f8d139 100644
--- a/lib/MockServer.h
+++ b/lib/MockServer.h
@@ -81,6 +81,26 @@ class MockServer : public
std::enable_shared_from_this<MockServer> {
proto::CommandConsumerStatsResponse response;
response.set_request_id(requestId);
connection->handleConsumerStatsResponse(response);
+ } else if (request == "LOOKUP") {
+ proto::CommandLookupTopicResponse response;
+ response.set_request_id(requestId);
+
response.set_response(proto::CommandLookupTopicResponse_LookupType_Connect);
+
response.set_brokerserviceurl("pulsar://localhost:6650");
+ connection->handleLookupTopicRespose(response);
+ } else if (request == "GET_LAST_MESSAGE_ID") {
+ proto::CommandGetLastMessageIdResponse response;
+ response.set_request_id(requestId);
+ response.mutable_last_message_id();
+
connection->handleGetLastMessageIdResponse(response);
+ } else if (request == "GET_TOPICS_OF_NAMESPACE") {
+ proto::CommandGetTopicsOfNamespaceResponse
response;
+ response.set_request_id(requestId);
+
connection->handleGetTopicOfNamespaceResponse(response);
+ } else if (request == "GET_SCHEMA") {
+ proto::CommandGetSchemaResponse response;
+ response.set_request_id(requestId);
+
response.mutable_schema()->set_type(proto::Schema_Type_String);
+ connection->handleGetSchemaResponse(response);
} else {
proto::CommandSuccess success;
success.set_request_id(requestId);
diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h
new file mode 100644
index 0000000..465073f
--- /dev/null
+++ b/lib/PendingRequest.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/Result.h>
+
+#include <atomic>
+#include <functional>
+#include <memory>
+
+#include "AsioDefines.h"
+#include "AsioTimer.h"
+#include "Future.h"
+
+namespace pulsar {
+
+template <typename T>
+class PendingRequest : public std::enable_shared_from_this<PendingRequest<T>> {
+ public:
+ PendingRequest(ASIO::steady_timer timer, std::function<void()>
timeoutCallback)
+ : timer_(std::move(timer)),
timeoutCallback_(std::move(timeoutCallback)) {}
+
+ void initialize() {
+ timer_.async_wait([this, weakSelf{this->weak_from_this()}](const auto&
error) {
+ auto self = weakSelf.lock();
+ if (!self || error ||
timeoutDisabled_.load(std::memory_order_acquire)) {
+ return;
+ }
+ timeoutCallback_();
+ promise_.setFailed(ResultTimeout);
+ });
+ }
+
+ void complete(const T& value) {
+ promise_.setValue(value);
+ cancelTimer(timer_);
+ }
+
+ void fail(Result result) {
+ promise_.setFailed(result);
+ cancelTimer(timer_);
+ }
+
+ void disableTimeout() { timeoutDisabled_.store(true,
std::memory_order_release); }
+
+ auto getFuture() const { return promise_.getFuture(); }
+
+ ~PendingRequest() { cancelTimer(timer_); }
+
+ private:
+ ASIO::steady_timer timer_;
+ Promise<Result, T> promise_;
+ std::function<void()> timeoutCallback_;
+ std::atomic_bool timeoutDisabled_{false};
+};
+
+template <typename T>
+using PendingRequestPtr = std::shared_ptr<PendingRequest<T>>;
+
+} // namespace pulsar
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index 6bd6cc8..63ac9d1 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -18,6 +18,7 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include <pulsar/ServiceInfo.h>
#include <pulsar/Version.h>
#include <algorithm>
@@ -33,8 +34,13 @@
#include "PulsarFriend.h"
#include "WaitUtils.h"
#include "lib/AsioDefines.h"
+#include "lib/AtomicSharedPtr.h"
#include "lib/ClientConnection.h"
+#include "lib/ConnectionPool.h"
+#include "lib/ExecutorService.h"
#include "lib/LogUtils.h"
+#include "lib/MockServer.h"
+#include "lib/TimeUtils.h"
#include "lib/checksum/ChecksumProvider.h"
#include "lib/stats/ProducerStatsImpl.h"
@@ -231,6 +237,97 @@ TEST(ClientTest, testConnectTimeoutAfterTcpConnected) {
server->stop();
}
+TEST(ClientTest, testConnectionNotReferredAfterClose) {
+ Client client(lookupUrl);
+ auto topic = "test-connection-not-referred-after-close-" +
std::to_string(time(nullptr));
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {},
reader));
+
+ bool available;
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(available));
+ ASSERT_FALSE(available);
+
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("test").build()));
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(available));
+ ASSERT_TRUE(available);
+
+ Message msg;
+ ASSERT_EQ(ResultOk, reader.readNext(msg));
+ ASSERT_EQ("test", msg.getDataAsString());
+
+ auto start = TimeUtils::currentTimeMillis();
+ ASSERT_EQ(ResultOk, client.close());
+ auto closeTimeMs = TimeUtils::currentTimeMillis() - start;
+ ASSERT_LT(closeTimeMs, 3000) << "close time: " << closeTimeMs << " ms";
+}
+
+TEST(ClientTest, testTimedOutPendingRequestsAreErasedFromConnectionMaps) {
+ const auto suffix =
std::to_string(std::chrono::steady_clock::now().time_since_epoch().count());
+ ClientConfiguration conf;
+ conf.setOperationTimeoutSeconds(1);
+
+ auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
+ AtomicSharedPtr<ServiceInfo> serviceInfo;
+ serviceInfo.store(std::make_shared<const ServiceInfo>(lookupUrl));
+ ConnectionPool pool(serviceInfo, conf, executorProvider, "");
+ auto connection = std::make_shared<ClientConnection>(lookupUrl, lookupUrl,
*serviceInfo.load(),
+
executorProvider->get(), conf, "", pool, 0);
+ PulsarFriend::setServerProtocolVersion(*connection, 8);
+
+ long requestIdGenerator = 0;
+ auto mockServer = std::make_shared<MockServer>(connection);
+ connection->attachMockServer(mockServer);
+ mockServer->setRequestDelay({{"TEST_PENDING_REQUEST", 1200},
+ {"LOOKUP", 1200},
+ {"GET_LAST_MESSAGE_ID", 1200},
+ {"GET_TOPICS_OF_NAMESPACE", 1200},
+ {"GET_SCHEMA", 1200}});
+
+ auto pingFuture =
+ connection->sendRequestWithId(Commands::newPing(),
requestIdGenerator++, "TEST_PENDING_REQUEST");
+
+ auto lookupPromise = std::make_shared<LookupDataResultPromise>();
+ auto lookupFuture = lookupPromise->getFuture();
+
connection->newTopicLookup("persistent://public/default/testTimedOutPendingRequests-"
+ suffix, false, "",
+ requestIdGenerator++, lookupPromise);
+
+ auto lastMessageIdFuture = connection->newGetLastMessageId(0,
requestIdGenerator++);
+
+ auto getTopicsOfNamespaceFuture = connection->newGetTopicsOfNamespace(
+ "public/default", CommandGetTopicsOfNamespace_Mode_PERSISTENT,
requestIdGenerator++);
+
+ auto getSchemaFuture = connection->newGetSchema(
+ "persistent://public/default/testTimedOutPendingRequests-" + suffix,
"", requestIdGenerator++);
+
+ ResponseData responseData;
+ ASSERT_EQ(ResultTimeout, pingFuture.get(responseData));
+ ASSERT_EQ(0u, PulsarFriend::getPendingRequests(*connection));
+
+ LookupDataResultPtr lookupData;
+ ASSERT_EQ(ResultTimeout, lookupFuture.get(lookupData));
+ ASSERT_EQ(0u, PulsarFriend::getPendingLookupRequests(*connection));
+ ASSERT_EQ(0u, PulsarFriend::getNumOfPendingLookupRequests(*connection));
+
+ GetLastMessageIdResponse lastMessageIdResponse;
+ ASSERT_EQ(ResultTimeout, lastMessageIdFuture.get(lastMessageIdResponse));
+ ASSERT_EQ(0u,
PulsarFriend::getPendingGetLastMessageIdRequests(*connection));
+
+ NamespaceTopicsPtr topics;
+ ASSERT_EQ(ResultTimeout, getTopicsOfNamespaceFuture.get(topics));
+ ASSERT_EQ(0u,
PulsarFriend::getPendingGetTopicsOfNamespaceRequests(*connection));
+
+ SchemaInfo schemaInfo;
+ ASSERT_EQ(ResultTimeout, getSchemaFuture.get(schemaInfo));
+ ASSERT_EQ(0u, PulsarFriend::getPendingGetSchemaRequests(*connection));
+
+ mockServer->close();
+ connection->close(ResultDisconnected).wait();
+ executorProvider->close();
+}
+
TEST(ClientTest, testGetNumberOfReferences) {
Client client("pulsar://localhost:6650");
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 1f351d1..3296953 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -167,6 +167,41 @@ class PulsarFriend {
return cnx.pendingConsumerStatsMap_.size();
}
+ static size_t getPendingRequests(const ClientConnection& cnx) {
+ std::lock_guard<std::mutex> lock(cnx.mutex_);
+ return cnx.pendingRequests_.size();
+ }
+
+ static size_t getPendingLookupRequests(const ClientConnection& cnx) {
+ std::lock_guard<std::mutex> lock(cnx.mutex_);
+ return cnx.pendingLookupRequests_.size();
+ }
+
+ static size_t getNumOfPendingLookupRequests(const ClientConnection& cnx) {
+ std::lock_guard<std::mutex> lock(cnx.mutex_);
+ return cnx.numOfPendingLookupRequest_;
+ }
+
+ static size_t getPendingGetLastMessageIdRequests(const ClientConnection&
cnx) {
+ std::lock_guard<std::mutex> lock(cnx.mutex_);
+ return cnx.pendingGetLastMessageIdRequests_.size();
+ }
+
+ static size_t getPendingGetTopicsOfNamespaceRequests(const
ClientConnection& cnx) {
+ std::lock_guard<std::mutex> lock(cnx.mutex_);
+ return cnx.pendingGetNamespaceTopicsRequests_.size();
+ }
+
+ static size_t getPendingGetSchemaRequests(const ClientConnection& cnx) {
+ std::lock_guard<std::mutex> lock(cnx.mutex_);
+ return cnx.pendingGetSchemaRequests_.size();
+ }
+
+ static void setServerProtocolVersion(ClientConnection& cnx, int
serverProtocolVersion) {
+ std::lock_guard<std::mutex> lock(cnx.mutex_);
+ cnx.serverProtocolVersion_ = serverProtocolVersion;
+ }
+
static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
}