This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit fbbb7c5b85543cbc942b805274f69436bae1b8c6 Author: Kang <kxiao.ti...@gmail.com> AuthorDate: Mon Apr 22 15:00:40 2024 +0800 improve logstash doris output plugin (#33135) 1. support multi thread concurrency for performance 2. support retry count and infinite retry 3. add a config to log doris stream load request header and response 4. add a config to log speed for better observability --- extension/logstash/lib/logstash/outputs/doris.rb | 316 ++++++++++++----------- extension/logstash/logstash-output-doris.gemspec | 4 +- 2 files changed, 167 insertions(+), 153 deletions(-) diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb index b7334aefb5f..34124f446bb 100644 --- a/extension/logstash/lib/logstash/outputs/doris.rb +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -23,21 +23,21 @@ require "logstash/namespace" require "logstash/json" require "logstash/util/shortname_resolver" require "uri" -require "stud/buffer" require "logstash/plugin_mixins/http_client" require "securerandom" require "json" require "base64" require "restclient" +require 'thread' class LogStash::Outputs::Doris < LogStash::Outputs::Base - include LogStash::PluginMixins::HttpClient - include Stud::Buffer - - concurrency :single + # support multi thread concurrency for performance + # so multi_receive() and function it calls are all stateless and thread safe + concurrency :shared config_name "doris" + # hosts array of Doris Frontends. eg ["http://fe1:8030", "http://fe2:8030"] config :http_hosts, :validate => :array, :required => true # the database which data is loaded to @@ -45,61 +45,35 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base # the table which data is loaded to config :table, :validate => :string, :required => true # label prefix of a stream load requst. - config :label_prefix, :validate => :string, :required => true - # user + config :label_prefix, :validate => :string, :default => "logstash" + # user name config :user, :validate => :string, :required => true # password config :password, :validate => :password, :required => true - # column separator - config :column_separator, :validate => :string, :default => "" - # column mappings. eg: "k1, k2, tmpk3, k3 = tmpk3 + 1" - config :columns, :validate => :string, :default => "" - # where predicate to filter data. eg: "k1 > 1 and k3 < 100" - config :where, :validate => :string, :default => "" - # max filter ratio - config :max_filter_ratio, :validate => :number, :default => -1 - # partition which data is loaded to. eg: "p1, p2" - config :partition, :validate => :array, :default => {} - # timeout of a stream load, in second - config :timeout, :validate => :number, :default => -1 - # switch off or on of strict mode - config :strict_mode, :validate => :string, :default => "false" - # timezone - config :timezone, :validate => :string, :default => "" - # memory limit of a stream load - config :exec_mem_limit, :validate => :number, :default => -1 - # Specify the format of imported data, csv and json are supported. - config :format, :validate => ['csv', 'json', 'csv_with_names', 'csv_with_names_and_types', 'parquet', 'orc'], :default => "csv" - # jsonpaths example: jsonpaths => ["$.id", "$.type", "$.actor.id", "$.actor.login"] - config :jsonpaths, :validate => :array, :default => [] - # Specify the root node of the json document - config :json_root, :validate => :string, :default => "" - # Boolean, true means the json will be parsed in the first row of the schema, turn on this option to improve the efficiency of json importing. - config :fuzzy_parse, :validate => :boolean, :default => false - # Parse json data converts numeric types to strings. - config :num_as_string, :validate => :boolean, :default => false - # true means support for reading one json object per line - config :read_json_by_line, :validate => :boolean, :default => false - # + + # use message field only + config :message_only, :validate => :boolean, :default => false + # field mapping + config :mapping, :validate => :hash # Custom headers to use # format is `headers => ["X-My-Header", "%{host}"]` config :headers, :validate => :hash - config :batch_size, :validate => :number, :default => 100000 - - config :idle_flush_time, :validate => :number, :default => 20 - - config :save_on_failure, :validate => :boolean, :default => true + config :save_on_failure, :validate => :boolean, :default => false - config :save_dir, :validate => :string, :default => "/tmp" + config :save_dir, :validate => :string, :default => "./" config :save_file, :validate => :string, :default => "failed.data" config :host_resolve_ttl_sec, :validate => :number, :default => 120 - config :automatic_retries, :validate => :number, :default => 3 + config :retry_count, :validate => :number, :default => -1 + + config :log_request, :validate => :boolean, :default => false + + config :log_speed_interval, :validate => :number, :default => 10 def print_plugin_info() @@ -112,23 +86,10 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base :db => @db, :table => @table, :label_prefix => @label_prefix, - :batch_size => @batch_size, - :idle_flush_time => @idle_flush_time, :http_hosts => @http_hosts) end def register - # Handle this deprecated option. TODO: remove the option - #@ssl_certificate_validation = @verify_ssl if @verify_ssl - - # We count outstanding requests with this queue - # This queue tracks the requests to create backpressure - # When this queue is empty no new requests may be sent, - # tokens must be added back by the client on success - #@request_tokens = SizedQueue.new(@pool_max) - #@pool_max.times {|t| @request_tokens << true } - #@requests = Array.new - @http_query = "/api/#{db}/#{table}/_stream_load" @hostnames_pool = @@ -138,11 +99,38 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base @request_headers = make_request_headers @logger.info("request headers: ", @request_headers) - buffer_initialize( - :max_items => @batch_size, - :max_interval => @idle_flush_time, - :logger => @logger - ) + @init_time = Time.now.to_i # seconds + @total_bytes = java.util.concurrent.atomic.AtomicLong.new(0) + @total_rows = java.util.concurrent.atomic.AtomicLong.new(0) + + report_thread = Thread.new do + last_time = @init_time + last_bytes = @total_bytes.get + last_rows = @total_rows.get + @logger.info("will report speed every #{@log_speed_interval} seconds") + while @log_speed_interval > 0 + sleep(@log_speed_interval) + + cur_time = Time.now.to_i # seconds + cur_bytes = @total_bytes.get + cur_rows = @total_rows.get + total_time = cur_time - @init_time + total_speed_mbps = cur_bytes / 1024 / 1024 / total_time + total_speed_rps = cur_rows / total_time + + inc_bytes = cur_bytes - last_bytes + inc_rows = cur_rows - last_rows + inc_time = cur_time - last_time + inc_speed_mbps = inc_bytes / 1024 / 1024 / inc_time + inc_speed_rps = inc_rows / inc_time + + @logger.info("total #{cur_bytes/1024/1024} MB #{cur_rows} ROWS, total speed #{total_speed_mbps} MB/s #{total_speed_rps} R/s, last #{inc_time} seconds speed #{inc_speed_mbps} MB/s #{inc_speed_rps} R/s") + + last_time = cur_time + last_bytes = cur_bytes + last_rows = cur_rows + end + end print_plugin_info() end # def register @@ -180,58 +168,83 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base end end - # This module currently does not support parallel requests as that would circumvent the batching - def receive(event) - buffer_receive(event) + def multi_receive(events) + return if events.empty? + send_events(events) end - public - def flush(events, close=false) + private + def send_events(events) documents = "" event_num = 0 events.each do |event| - documents << event.get("[message]") << "\n" + documents << event_body(event) << "\n" event_num += 1 end - @logger.info("get event num: #{event_num}") + # @logger.info("get event num: #{event_num}") @logger.debug("get documents: #{documents}") hosts = get_host_addresses() - @request_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_" + Time.now.strftime('%Y%m%d%H%M%S_%L') - make_request(documents, hosts, @http_query, 1, hosts.sample) - end - - private + http_headers = @request_headers.dup + http_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid) + + # @request_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_" + Time.now.strftime('%Y%m%d%H%M%S_%L') + req_count = 0 + sleep_for = 1 + while true + response = make_request(documents, http_headers, hosts, @http_query, hosts.sample) + + req_count += 1 + response_json = {} + begin + response_json = JSON.parse(response.body) + rescue => e + @logger.warn("doris stream load response: #{response} is not a valid JSON") + end + if response_json["Status"] == "Success" + @total_bytes.addAndGet(documents.size) + @total_rows.addAndGet(event_num) + break + else + @logger.warn("FAILED doris stream load response:\n#{response}") + + if @retry_count >= 0 && req_count > @retry_count + @logger.warn("DROP this batch after failed #{req_count} times.") + if @save_on_failure + @logger.warn("Try save to disk.Disk file path : #{save_dir}/#{table}_#{save_file}") + save_to_disk(documents) + end + break + end - def save_to_disk(documents) - begin - file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a") - file.write(documents) - rescue IOError => e - log_failure("An error occurred while saving file to disk: #{e}", - :file_name => file_name) - ensure - file.close unless file.nil? + # sleep and then retry + sleep_for = sleep_for * 2 + sleep_for = sleep_for <= 60 ? sleep_for : 60 + sleep_rand = (sleep_for / 2) + (rand(0..sleep_for) / 2) + @logger.warn("Will do retry #{req_count} after sleep #{sleep_rand} secs.") + sleep(sleep_rand) + end end end - private - - def make_request(documents, hosts, query, req_count = 1,host = "", uuid = SecureRandom.hex) - + def make_request(documents, http_headers, hosts, query, host = "") if host == "" host = hosts.pop end - url = host+query - @logger.debug("req count: #{req_count}. get url: #{url}") - @logger.debug("request headers: ", @request_headers) - + url = host + query - result = RestClient.put(url, documents,@request_headers) { |response, request, result| + if @log_request or @logger.debug? + @logger.info("doris stream load request url: #{url} headers: #{http_headers} body size: #{documents.size}") + end + @logger.debug("doris stream load request body: #{documents}") + + response = "" + begin + response = RestClient.put(url, documents, http_headers) { |response, request, result| case response.code when 301, 302, 307 @logger.debug("redirect to: #{response.headers[:location]}") @@ -239,77 +252,78 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base else response.return! end - } - - @logger.info("response : \n #{result}" ) - result_body = JSON.parse(result.body) - if result_body['Status'] != "Success" - if req_count < @automatic_retries - @logger.warn("Response Status : #{result_body['Status']} . Retrying...... #{req_count}") - make_request(documents,hosts,query,req_count + 1,host,uuid) - return - end - @logger.warn("Load failed ! Try #{req_count} times.") - if @save_on_failure - @logger.warn("Retry times over #{req_count} times.Try save to disk.Disk file path : #{save_dir}/#{table}_#{save_file}") - save_to_disk(documents) - end + } + rescue => e + log_failure("doris stream load request error: #{e}") end + if @log_request or @logger.debug? + @logger.info("doris stream load response:\n#{response}") + end + + return response end # def make_request + # Format the HTTP body + private + def event_body(event) + if @message_only + event.get("[message]") + else + LogStash::Json.dump(map_event(event)) + end + end + + private + def map_event(event) + if @mapping + # only get fields in mapping + convert_mapping(@mapping, event) + else + # get all fields + event.to_hash + end + end + + private + def convert_mapping(mapping, event) + if mapping.is_a?(Hash) + mapping.reduce({}) do |acc, kv| + k, v = kv + acc[k] = convert_mapping(v, event) + acc + end + elsif mapping.is_a?(Array) + mapping.map { |elem| convert_mapping(elem, event) } + else + event.sprintf(mapping) + end + end + + private + def save_to_disk(documents) + begin + file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a") + file.write(documents) + rescue IOError => e + log_failure("An error occurred while saving file to disk: #{e}", + :file_name => file_name) + ensure + file.close unless file.nil? + end + end + # This is split into a separate method mostly to help testing - def log_failure(message, opts) - @logger.warn("[HTTP Output Failure] #{message}", opts) + def log_failure(message) + @logger.warn("[Doris Output Failure] #{message}") end def make_request_headers() headers = @headers || {} headers["Expect"] ||= "100-continue" headers["Content-Type"] ||= "text/plain;charset=utf-8" - headers["strict_mode"] ||= @strict_mode headers["Authorization"] = "Basic " + Base64.strict_encode64("#{user}:#{password.value}") - # column_separator - if @column_separator != "" - headers["column_separator"] = @column_separator - end - # timezone - if @timezone != "" - headers["timezone"] = @timezone - end - # partition - if @partition.size > 0 - headers["partition"] ||= @partition - end - # where - if @where != "" - headers["where"] ||= @where - end - # timeout - if @timeout != -1 - headers["timeout"] ||= @timeout - end - # max_filter_ratio - if @max_filter_ratio != -1 - headers["max_filter_ratio"] ||= @max_filter_ratio - end - # exec_mem_limit - if @exec_mem_limit != -1 - headers["exec_mem_limit"] ||= @exec_mem_limit - end - # columns - if @columns != "" - headers["columns"] ||= @columns - end - headers["format"] = @format if @format != "" - headers["jsonpaths"] = @jsonpaths if @jsonpaths != [] - headers["json_root"] = @json_root if @json_root != "" - headers["fuzzy_parse"] = @fuzzy_parse if @fuzzy_parse != "" - headers["num_as_string"] = @num_as_string if @num_as_string != "" - headers["read_json_by_line"] = @read_json_by_line if @read_json_by_line != "" headers end end # end of class LogStash::Outputs::Doris - - diff --git a/extension/logstash/logstash-output-doris.gemspec b/extension/logstash/logstash-output-doris.gemspec index ee456a0e42d..163ba260f07 100644 --- a/extension/logstash/logstash-output-doris.gemspec +++ b/extension/logstash/logstash-output-doris.gemspec @@ -18,8 +18,8 @@ under the License. =end Gem::Specification.new do |s| s.name = 'logstash-output-doris' - s.version = '0.2.0' - s.author = 'wfjcmcb' + s.version = '1.0.0' + s.author = 'Apache Doris' s.email = 'd...@doris.apache.org' s.homepage = 'http://doris.apache.org' s.licenses = ['Apache-2.0'] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org