yiguolei commented on code in PR #25798:
URL: https://github.com/apache/doris/pull/25798#discussion_r1371569065


##########
be/src/agent/topic_subscriber.cpp:
##########
@@ -29,37 +29,41 @@ namespace doris {
 TopicSubscriber::TopicSubscriber() {}
 
 TopicSubscriber::~TopicSubscriber() {
-    // Delete all listeners in the register
-    std::map<TTopicType::type, std::vector<TopicListener*>>::iterator it =
-            _registered_listeners.begin();
-    for (; it != _registered_listeners.end(); ++it) {
-        std::vector<TopicListener*>& listeners = it->second;
-        std::vector<TopicListener*>::iterator listener_it = listeners.begin();
-        for (; listener_it != listeners.end(); ++listener_it) {
-            delete *listener_it;
-        }
+    for (auto iter = _registered_listeners.begin(); iter != 
_registered_listeners.end(); iter++) {
+        delete iter->second;
     }
 }
 
-void TopicSubscriber::register_listener(TTopicType::type topic_type, 
TopicListener* listener) {
+void TopicSubscriber::register_listener(TTopicInfoType::type topic_type,
+                                        TopicListener* topic_listener) {
     // Unique lock here to prevent access to listeners
     std::lock_guard<std::shared_mutex> lock(_listener_mtx);
-    this->_registered_listeners[topic_type].push_back(listener);
+    this->_registered_listeners.emplace(topic_type, topic_listener);
 }
 
-void TopicSubscriber::handle_updates(const TAgentPublishRequest& 
agent_publish_request) {
+void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& 
topic_request) {
     // Shared lock here in order to avoid updates in listeners' map
     std::shared_lock lock(_listener_mtx);
-    // Currently, not deal with protocol version, the listener should deal 
with protocol version
-    const std::vector<TTopicUpdate>& topic_updates = 
agent_publish_request.updates;
-    std::vector<TTopicUpdate>::const_iterator topic_update_it = 
topic_updates.begin();
-    for (; topic_update_it != topic_updates.end(); ++topic_update_it) {
-        std::vector<TopicListener*>& listeners = 
this->_registered_listeners[topic_update_it->type];
-        std::vector<TopicListener*>::iterator listener_it = listeners.begin();
-        // Send the update to all listeners with protocol version.
-        for (; listener_it != listeners.end(); ++listener_it) {
-            
(*listener_it)->handle_update(agent_publish_request.protocol_version, 
*topic_update_it);
+    pthread_t tids[_registered_listeners.size()];
+    int i = 0;
+    for (auto& listener_pair : _registered_listeners) {
+        ThreadArgsPair* arg_pair = new ThreadArgsPair();
+        arg_pair->first = listener_pair.second;
+        arg_pair->second = new TPublishTopicRequest(topic_request);
+        int ret = pthread_create(

Review Comment:
   not use pthread directly. using doris::thread please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to