yiguolei commented on code in PR #25798: URL: https://github.com/apache/doris/pull/25798#discussion_r1371569065
########## be/src/agent/topic_subscriber.cpp: ########## @@ -29,37 +29,41 @@ namespace doris { TopicSubscriber::TopicSubscriber() {} TopicSubscriber::~TopicSubscriber() { - // Delete all listeners in the register - std::map<TTopicType::type, std::vector<TopicListener*>>::iterator it = - _registered_listeners.begin(); - for (; it != _registered_listeners.end(); ++it) { - std::vector<TopicListener*>& listeners = it->second; - std::vector<TopicListener*>::iterator listener_it = listeners.begin(); - for (; listener_it != listeners.end(); ++listener_it) { - delete *listener_it; - } + for (auto iter = _registered_listeners.begin(); iter != _registered_listeners.end(); iter++) { + delete iter->second; } } -void TopicSubscriber::register_listener(TTopicType::type topic_type, TopicListener* listener) { +void TopicSubscriber::register_listener(TTopicInfoType::type topic_type, + TopicListener* topic_listener) { // Unique lock here to prevent access to listeners std::lock_guard<std::shared_mutex> lock(_listener_mtx); - this->_registered_listeners[topic_type].push_back(listener); + this->_registered_listeners.emplace(topic_type, topic_listener); } -void TopicSubscriber::handle_updates(const TAgentPublishRequest& agent_publish_request) { +void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_request) { // Shared lock here in order to avoid updates in listeners' map std::shared_lock lock(_listener_mtx); - // Currently, not deal with protocol version, the listener should deal with protocol version - const std::vector<TTopicUpdate>& topic_updates = agent_publish_request.updates; - std::vector<TTopicUpdate>::const_iterator topic_update_it = topic_updates.begin(); - for (; topic_update_it != topic_updates.end(); ++topic_update_it) { - std::vector<TopicListener*>& listeners = this->_registered_listeners[topic_update_it->type]; - std::vector<TopicListener*>::iterator listener_it = listeners.begin(); - // Send the update to all listeners with protocol version. - for (; listener_it != listeners.end(); ++listener_it) { - (*listener_it)->handle_update(agent_publish_request.protocol_version, *topic_update_it); + pthread_t tids[_registered_listeners.size()]; + int i = 0; + for (auto& listener_pair : _registered_listeners) { + ThreadArgsPair* arg_pair = new ThreadArgsPair(); + arg_pair->first = listener_pair.second; + arg_pair->second = new TPublishTopicRequest(topic_request); + int ret = pthread_create( Review Comment: not use pthread directly. using doris::thread please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org