This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 21e1d6d8a7c [fix](logstash) remove ShortNameResolver to solve thread race problem (#44598) 21e1d6d8a7c is described below commit 21e1d6d8a7ccb08848e7be61ae574c7ec4b27e69 Author: Mingxi <71588583+joker-sta...@users.noreply.github.com> AuthorDate: Mon Dec 2 20:27:10 2024 +0800 [fix](logstash) remove ShortNameResolver to solve thread race problem (#44598) remove ShortNameResolver to solve thread race problem --- extension/logstash/lib/logstash/outputs/doris.rb | 50 +------------------ .../lib/logstash/util/shortname_resolver.rb | 58 ---------------------- extension/logstash/logstash-output-doris.gemspec | 1 - 3 files changed, 2 insertions(+), 107 deletions(-) diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index 02e7591b0a3..21d3ee6e752 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -21,7 +21,6 @@ under the License. require "logstash/outputs/base" require "logstash/namespace" require "logstash/json" -require "logstash/util/shortname_resolver" require 'logstash/util/formater' require "uri" require "securerandom" @@ -67,8 +66,6 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base config :save_file, :validate => :string, :default => "failed.data" - config :host_resolve_ttl_sec, :validate => :number, :default => 120 - config :max_retries, :validate => :number, :default => -1 config :log_request, :validate => :boolean, :default => true @@ -92,10 +89,6 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base def register @http_query = "/api/#{@db}/#{@table}/_stream_load" - @hostnames_pool = - parse_http_hosts(@http_hosts, - ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger)) - @request_headers = make_request_headers @logger.info("request headers: ", @request_headers) @@ -141,39 +134,6 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base print_plugin_info() end # def register - private - - def parse_http_hosts(hosts, resolver) - ip_re = /^[\d]+\.[\d]+\.[\d]+\.[\d]+$/ - - lambda { - hosts.flat_map { |h| - scheme = URI(h).scheme - host = URI(h).host - port = URI(h).port - path = URI(h).path - - if ip_re !~ host - resolver.get_addresses(host).map { |ip| - "#{scheme}://#{ip}:#{port}#{path}" - } - else - [h] - end - } - } - end - - private - - def get_host_addresses() - begin - @hostnames_pool.call - rescue Exception => ex - @logger.error('Error while resolving host', :error => ex.to_s) - end - end - def multi_receive(events) return if events.empty? send_events(events) @@ -191,8 +151,6 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base # @logger.info("get event num: #{event_num}") @logger.debug("get documents: #{documents}") - hosts = get_host_addresses() - http_headers = @request_headers.dup if !@group_commit # only set label if group_commit is off_mode or not set, since lable can not be used with group_commit @@ -202,7 +160,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base req_count = 0 sleep_for = 1 while true - response = make_request(documents, http_headers, hosts, @http_query, hosts.sample) + response = make_request(documents, http_headers, @http_query, @http_hosts.sample) req_count += 1 response_json = {} @@ -246,11 +204,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base end private - def make_request(documents, http_headers, hosts, query, host = "") - if host == "" - host = hosts.pop - end - + def make_request(documents, http_headers, query, host) url = host + query if @log_request or @logger.debug? diff --git a/extension/logstash/lib/logstash/util/shortname_resolver.rb b/extension/logstash/lib/logstash/util/shortname_resolver.rb deleted file mode 100644 index 1437ccba007..00000000000 --- a/extension/logstash/lib/logstash/util/shortname_resolver.rb +++ /dev/null @@ -1,58 +0,0 @@ -=begin -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -=end -require 'resolv' -require 'mini_cache' - -class ShortNameResolver - def initialize(ttl:, logger:) - @ttl = ttl - @store = MiniCache::Store.new - @logger = logger - end - - private - def resolve_cached(shortname) - @store.get_or_set(shortname) do - addresses = resolve(shortname) - raise "Bad shortname '#{shortname}'" if addresses.empty? - MiniCache::Data.new(addresses, expires_in: @ttl) - end - end - - private - def resolve(shortname) - addresses = Resolv::DNS.open do |dns| - dns.getaddresses(shortname).map { |r| r.to_s } - end - - @logger.info("Resolved shortname '#{shortname}' to addresses #{addresses}") - - return addresses - end - - public - def get_address(shortname) - return resolve_cached(shortname).sample - end - - public - def get_addresses(shortname) - return resolve_cached(shortname) - end -end diff --git a/extension/logstash/logstash-output-doris.gemspec b/extension/logstash/logstash-output-doris.gemspec index 689b93503f6..30341b83156 100644 --- a/extension/logstash/logstash-output-doris.gemspec +++ b/extension/logstash/logstash-output-doris.gemspec @@ -38,7 +38,6 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - s.add_runtime_dependency 'mini_cache', ">= 1.0.0", "< 2.0.0" s.add_runtime_dependency "rest-client", '~> 2.1' s.add_development_dependency 'logstash-devutils', '~> 1.3' --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org