This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 144930e [ISSUE #494] [Part-A] Add register and unregister MQAdmin in
MQClientFactory (#495)
144930e is described below
commit 144930eafd764fa85a588b7ba00d6a83643b5717
Author: takagi <[email protected]>
AuthorDate: Tue Jun 24 11:01:30 2025 +0800
[ISSUE #494] [Part-A] Add register and unregister MQAdmin in
MQClientFactory (#495)
---
src/MQClientFactory.cpp | 45 ++++++++++++++++++++++++
src/MQClientFactory.h | 13 +++++++
src/client/DefaultMQAdmin.cpp | 80 +++++++++++++++++++++++++++++++++++++++++++
src/client/DefaultMQAdmin.h | 32 +++++++++++++++++
src/common/UtilAll.h | 1 +
src/include/MQAdmin.h | 28 +++++++++++++++
6 files changed, 199 insertions(+)
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 69d2543..c4673d8 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -393,6 +393,35 @@ void MQClientFactory::unregisterConsumer(MQConsumer*
pConsumer) {
eraseConsumerFromTable(groupName);
}
+bool MQClientFactory::registerMQAdmin(MQAdmin* pAdmin) {
+ string groupName = pAdmin->getGroupName();
+ string namesrvaddr = pAdmin->getNamesrvAddr();
+ if (groupName.empty()) {
+ return false;
+ }
+ if (!addAdminToTable(groupName, pAdmin)) {
+ return false;
+ }
+ LOG_DEBUG("registerAdmin success:%s", groupName.c_str());
+ //<!set nameserver;
+ if (namesrvaddr.empty()) {
+ string nameSrvDomain(pAdmin->getNamesrvDomain());
+ if (!nameSrvDomain.empty())
+ m_nameSrvDomain = nameSrvDomain;
+
pAdmin->setNamesrvAddr(m_pClientAPIImpl->fetchNameServerAddr(m_nameSrvDomain));
+ } else {
+ m_bFetchNSService = false;
+ m_pClientAPIImpl->updateNameServerAddr(namesrvaddr);
+ LOG_INFO("user specfied name server address: %s", namesrvaddr.c_str());
+ }
+ return true;
+}
+
+void MQClientFactory::unregisterMQAdmin(MQAdmin* pAdmin) {
+ string groupName = pAdmin->getGroupName();
+ eraseAdminFromTable(groupName);
+}
+
MQProducer* MQClientFactory::selectProducer(const string& producerName) {
boost::lock_guard<boost::mutex> lock(m_producerTableMutex);
if (m_producerTable.find(producerName) != m_producerTable.end()) {
@@ -493,6 +522,22 @@ void MQClientFactory::eraseConsumerFromTable(const string&
consumerName) {
LOG_WARN("could not find consumer:%s from table", consumerName.c_str());
}
+bool MQClientFactory::addAdminToTable(const string& adminName, MQAdmin*
pMQAdmin) {
+ boost::lock_guard<boost::recursive_mutex> lock(m_adminTableMutex);
+ if (m_adminTable.find(adminName) != m_adminTable.end())
+ return false;
+ m_adminTable[adminName] = pMQAdmin;
+ return true;
+}
+
+void MQClientFactory::eraseAdminFromTable(const string& adminName) {
+ boost::lock_guard<boost::recursive_mutex> lock(m_adminTableMutex);
+ if (m_adminTable.find(adminName) != m_adminTable.end())
+ m_adminTable.erase(adminName);
+ else
+ LOG_WARN("could not find admin:%s from table", adminName.c_str());
+}
+
int MQClientFactory::getConsumerTableSize() {
boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
return m_consumerTable.size();
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index e071db2..6e79b28 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -23,6 +23,7 @@
#include <boost/thread/recursive_mutex.hpp>
#include <boost/thread/thread.hpp>
#include "FindBrokerResult.h"
+#include "MQAdmin.h"
#include "MQClientAPIImpl.h"
#include "MQClientException.h"
#include "MQConsumer.h"
@@ -57,6 +58,8 @@ class MQClientFactory {
virtual void unregisterProducer(MQProducer* pProducer);
virtual bool registerConsumer(MQConsumer* pConsumer);
virtual void unregisterConsumer(MQConsumer* pConsumer);
+ virtual bool registerMQAdmin(MQAdmin* pAdmin);
+ virtual void unregisterMQAdmin(MQAdmin* pAdmin);
void createTopic(const string& key,
const string& newTopic,
@@ -157,6 +160,9 @@ class MQClientFactory {
int getProducerTableSize();
void insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
+ // admin related operation
+ void eraseAdminFromTable(const string& adminName);
+
// topicPublishInfo related operation
void addTopicInfoToTable(const string& topic,
boost::shared_ptr<TopicPublishInfo> pTopicPublishInfo);
void eraseTopicInfoFromTable(const string& topic);
@@ -173,6 +179,7 @@ class MQClientFactory {
bool addProducerToTable(const string& producerName, MQProducer* pMQProducer);
bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer);
+ bool addAdminToTable(const string& adminName, MQAdmin* pMQAdmin);
private:
string m_nameSrvDomain; // per clientId
@@ -190,6 +197,12 @@ class MQClientFactory {
boost::recursive_mutex m_consumerTableMutex;
MQCMAP m_consumerTable;
+ //<! group --> MQAdmin;
+ typedef map<string, MQAdmin*> MQAMAP;
+ // Changed to recursive mutex due to avoid deadlock issue:
+ boost::recursive_mutex m_adminTableMutex;
+ MQAMAP m_adminTable;
+
//<! Topic---> TopicRouteData
typedef map<string, TopicRouteData*> TRDMAP;
boost::mutex m_topicRouteTableMutex;
diff --git a/src/client/DefaultMQAdmin.cpp b/src/client/DefaultMQAdmin.cpp
new file mode 100644
index 0000000..e58022c
--- /dev/null
+++ b/src/client/DefaultMQAdmin.cpp
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultMQAdmin.h"
+
+namespace rocketmq {
+DefaultMQAdmin::DefaultMQAdmin(const std::string& groupName) {
+ //<!set default group name;
+ string gname = groupName.empty() ? DEFAULT_ADMIN_GROUP : groupName;
+ setGroupName(gname);
+}
+
+DefaultMQAdmin::~DefaultMQAdmin() {}
+
+void DefaultMQAdmin::start() {
+#ifndef WIN32
+ /* Ignore the SIGPIPE */
+ struct sigaction sa;
+ memset(&sa, 0, sizeof(struct sigaction));
+ sa.sa_handler = SIG_IGN;
+ sa.sa_flags = 0;
+ sigaction(SIGPIPE, &sa, 0);
+#endif
+ LOG_WARN("###Current Admin@%s", getClientVersionString().c_str());
+ switch (m_serviceState) {
+ case CREATE_JUST: {
+ m_serviceState = START_FAILED;
+ DefaultMQClient::start();
+ LOG_INFO("DefaultMQAdmin:%s start", m_GroupName.c_str());
+ bool registerOK = getFactory()->registerMQAdmin(this);
+ if (!registerOK) {
+ m_serviceState = CREATE_JUST;
+ THROW_MQEXCEPTION(
+ MQClientException,
+ "The admin group[" + getGroupName() + "] has been created before,
specify another name please.", -1);
+ }
+ getFactory()->start();
+ m_serviceState = RUNNING;
+ break;
+ }
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+}
+
+void DefaultMQAdmin::shutdown() {
+ switch (m_serviceState) {
+ case RUNNING: {
+ LOG_INFO("DefaultMQAdmin:%s shutdown", m_GroupName.c_str());
+ getFactory()->unregisterMQAdmin(this);
+ getFactory()->shutdown();
+ m_serviceState = SHUTDOWN_ALREADY;
+ break;
+ }
+ case SHUTDOWN_ALREADY:
+ case CREATE_JUST:
+ break;
+ default:
+ break;
+ }
+}
+} // namespace rocketmq
\ No newline at end of file
diff --git a/src/client/DefaultMQAdmin.h b/src/client/DefaultMQAdmin.h
new file mode 100644
index 0000000..baa7e0f
--- /dev/null
+++ b/src/client/DefaultMQAdmin.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef DEFAULTMQADMIN_H
+#define DEFAULTMQADMIN_H
+#include "MQAdmin.h"
+#include "MQClientFactory.h"
+
+namespace rocketmq {
+class DefaultMQAdmin : public MQAdmin {
+ public:
+ DefaultMQAdmin(const std::string& groupname);
+ ~DefaultMQAdmin();
+ void start();
+ void shutdown();
+};
+}; // namespace rocketmq
+#endif // DEFAULTMQADMIN_H
\ No newline at end of file
diff --git a/src/common/UtilAll.h b/src/common/UtilAll.h
index 58e62ab..b2bc1a3 100644
--- a/src/common/UtilAll.h
+++ b/src/common/UtilAll.h
@@ -54,6 +54,7 @@ const string DEFAULT_TOPIC = "TBW102";
const string BENCHMARK_TOPIC = "BenchmarkTest";
const string DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
const string DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
+const string DEFAULT_ADMIN_GROUP = "DEFAULT_ADMIN";
const string TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
const string CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
const string SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
diff --git a/src/include/MQAdmin.h b/src/include/MQAdmin.h
new file mode 100644
index 0000000..545dacb
--- /dev/null
+++ b/src/include/MQAdmin.h
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef DEFAULTMQADMIN_H__
+#define DEFAULTMQADMIN_H__
+#include "DefaultMQClient.h"
+
+namespace rocketmq {
+class MQAdmin : public DefaultMQClient {
+ public:
+ virtual ~MQAdmin() {}
+};
+} // namespace rocketmq
+#endif // DEFAULTMQADMIN_H__
\ No newline at end of file