This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit fbbb7c5b85543cbc942b805274f69436bae1b8c6
Author: Kang <kxiao.ti...@gmail.com>
AuthorDate: Mon Apr 22 15:00:40 2024 +0800

    improve logstash doris output plugin (#33135)
    
    1. support multi thread concurrency for performance
    2. support retry count and infinite retry
    3. add a config to log doris stream load request header and response
    4. add a config to log speed for better observability
---
 extension/logstash/lib/logstash/outputs/doris.rb | 316 ++++++++++++-----------
 extension/logstash/logstash-output-doris.gemspec |   4 +-
 2 files changed, 167 insertions(+), 153 deletions(-)

diff --git a/extension/logstash/lib/logstash/outputs/doris.rb 
b/extension/logstash/lib/logstash/outputs/doris.rb
index b7334aefb5f..34124f446bb 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -23,21 +23,21 @@ require "logstash/namespace"
 require "logstash/json"
 require "logstash/util/shortname_resolver"
 require "uri"
-require "stud/buffer"
 require "logstash/plugin_mixins/http_client"
 require "securerandom"
 require "json"
 require "base64"
 require "restclient"
+require 'thread'
 
 
 class LogStash::Outputs::Doris < LogStash::Outputs::Base
-   include LogStash::PluginMixins::HttpClient
-   include Stud::Buffer
-
-   concurrency :single
+   # support multi thread concurrency for performance
+   # so multi_receive() and function it calls are all stateless and thread safe
+   concurrency :shared
 
    config_name "doris"
+
    # hosts array of Doris Frontends. eg ["http://fe1:8030";, "http://fe2:8030";]
    config :http_hosts, :validate => :array, :required => true
    # the database which data is loaded to
@@ -45,61 +45,35 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
    # the table which data is loaded to
    config :table, :validate => :string, :required => true
    # label prefix of a stream load requst.
-   config :label_prefix, :validate => :string, :required => true
-   # user
+   config :label_prefix, :validate => :string, :default => "logstash"
+   # user name
    config :user, :validate => :string, :required => true
    # password
    config :password, :validate => :password, :required => true
-   # column separator
-   config :column_separator, :validate => :string, :default => ""
-   # column mappings. eg: "k1, k2, tmpk3, k3 = tmpk3 + 1"
-   config :columns, :validate => :string, :default => ""
-   # where predicate to filter data. eg: "k1 > 1 and k3 < 100"
-   config :where, :validate => :string, :default => ""
-   # max filter ratio
-   config :max_filter_ratio, :validate => :number, :default => -1
-   # partition which data is loaded to. eg: "p1, p2"
-   config :partition, :validate => :array, :default => {}
-   # timeout of a stream load, in second
-   config :timeout, :validate => :number, :default => -1
-   # switch off or on of strict mode
-   config :strict_mode, :validate => :string, :default => "false"
-   # timezone
-   config :timezone, :validate => :string, :default => ""
-   # memory limit of a stream load
-   config :exec_mem_limit, :validate => :number, :default => -1
-   # Specify the format of imported data, csv and json are supported.
-   config :format, :validate => ['csv', 'json', 'csv_with_names', 
'csv_with_names_and_types', 'parquet', 'orc'], :default => "csv"
-   # jsonpaths example: jsonpaths => ["$.id", "$.type", "$.actor.id", 
"$.actor.login"]
-   config :jsonpaths, :validate => :array, :default => []
-   # Specify the root node of the json document
-   config :json_root, :validate => :string, :default => ""
-   # Boolean, true means the json will be parsed in the first row of the 
schema, turn on this option to improve the efficiency of json importing.
-   config :fuzzy_parse, :validate => :boolean, :default => false
-   # Parse json data converts numeric types to strings.
-   config :num_as_string, :validate => :boolean, :default => false
-   # true means support for reading one json object per line
-   config :read_json_by_line, :validate => :boolean, :default => false
-   #  
+
+   # use message field only
+   config :message_only, :validate => :boolean, :default => false
+   # field mapping
+   config :mapping, :validate => :hash
 
 
    # Custom headers to use
    # format is `headers => ["X-My-Header", "%{host}"]`
    config :headers, :validate => :hash
 
-   config :batch_size, :validate => :number, :default => 100000
-
-   config :idle_flush_time, :validate => :number, :default => 20
-
-   config :save_on_failure, :validate => :boolean, :default => true
+   config :save_on_failure, :validate => :boolean, :default => false
 
-   config :save_dir, :validate => :string, :default => "/tmp"
+   config :save_dir, :validate => :string, :default => "./"
 
    config :save_file, :validate => :string, :default => "failed.data"
 
    config :host_resolve_ttl_sec, :validate => :number, :default => 120
 
-   config :automatic_retries, :validate => :number, :default => 3
+   config :retry_count, :validate => :number, :default => -1
+
+   config :log_request, :validate => :boolean, :default => false
+
+   config :log_speed_interval, :validate => :number, :default => 10
 
 
    def print_plugin_info()
@@ -112,23 +86,10 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       :db => @db,
       :table => @table,
       :label_prefix => @label_prefix,
-      :batch_size => @batch_size,
-      :idle_flush_time => @idle_flush_time,
       :http_hosts => @http_hosts)
    end
 
    def register
-      # Handle this deprecated option. TODO: remove the option
-      #@ssl_certificate_validation = @verify_ssl if @verify_ssl
-
-      # We count outstanding requests with this queue
-      # This queue tracks the requests to create backpressure
-      # When this queue is empty no new requests may be sent,
-      # tokens must be added back by the client on success
-      #@request_tokens = SizedQueue.new(@pool_max)
-      #@pool_max.times {|t| @request_tokens << true }
-      #@requests = Array.new
-
       @http_query = "/api/#{db}/#{table}/_stream_load"
 
       @hostnames_pool =
@@ -138,11 +99,38 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       @request_headers = make_request_headers
       @logger.info("request headers: ", @request_headers)
 
-      buffer_initialize(
-      :max_items => @batch_size,
-      :max_interval => @idle_flush_time,
-      :logger => @logger
-      )
+      @init_time = Time.now.to_i # seconds
+      @total_bytes = java.util.concurrent.atomic.AtomicLong.new(0)
+      @total_rows = java.util.concurrent.atomic.AtomicLong.new(0)
+
+      report_thread = Thread.new do
+         last_time = @init_time
+         last_bytes = @total_bytes.get
+         last_rows = @total_rows.get
+         @logger.info("will report speed every #{@log_speed_interval} seconds")
+         while @log_speed_interval > 0
+            sleep(@log_speed_interval)
+
+            cur_time = Time.now.to_i # seconds
+            cur_bytes = @total_bytes.get
+            cur_rows = @total_rows.get
+            total_time = cur_time - @init_time
+            total_speed_mbps = cur_bytes / 1024 / 1024 / total_time
+            total_speed_rps = cur_rows / total_time
+
+            inc_bytes = cur_bytes - last_bytes
+            inc_rows = cur_rows - last_rows
+            inc_time = cur_time - last_time
+            inc_speed_mbps = inc_bytes / 1024 / 1024 / inc_time
+            inc_speed_rps = inc_rows / inc_time
+
+            @logger.info("total #{cur_bytes/1024/1024} MB #{cur_rows} ROWS, 
total speed #{total_speed_mbps} MB/s #{total_speed_rps} R/s, last #{inc_time} 
seconds speed #{inc_speed_mbps} MB/s #{inc_speed_rps} R/s")
+
+            last_time = cur_time
+            last_bytes = cur_bytes
+            last_rows = cur_rows
+         end
+      end
 
       print_plugin_info()
    end # def register
@@ -180,58 +168,83 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       end
    end
 
-   # This module currently does not support parallel requests as that would 
circumvent the batching
-   def receive(event)
-      buffer_receive(event)
+   def multi_receive(events)
+      return if events.empty?
+      send_events(events)
    end
 
-   public
-   def flush(events, close=false)
+   private
+   def send_events(events)
       documents = ""
       event_num = 0
       events.each do |event|
-         documents << event.get("[message]") << "\n"
+         documents << event_body(event) << "\n"
          event_num += 1
       end
 
-      @logger.info("get event num: #{event_num}")
+      # @logger.info("get event num: #{event_num}")
       @logger.debug("get documents: #{documents}")
 
       hosts = get_host_addresses()
 
-      @request_headers["label"] = label_prefix + "_" + @db + "_" + @table + 
"_" + Time.now.strftime('%Y%m%d%H%M%S_%L')
-      make_request(documents, hosts, @http_query, 1, hosts.sample)
-   end
-
-   private
+      http_headers = @request_headers.dup
+      http_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_" + 
Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+
+      # @request_headers["label"] = label_prefix + "_" + @db + "_" + @table + 
"_" + Time.now.strftime('%Y%m%d%H%M%S_%L')
+      req_count = 0
+      sleep_for = 1
+      while true
+         response = make_request(documents, http_headers, hosts, @http_query, 
hosts.sample)
+
+         req_count += 1
+         response_json = {}
+         begin
+            response_json = JSON.parse(response.body)
+         rescue => e
+            @logger.warn("doris stream load response: #{response} is not a 
valid JSON")
+         end
+         if response_json["Status"] == "Success"
+            @total_bytes.addAndGet(documents.size)
+            @total_rows.addAndGet(event_num)
+            break
+         else
+            @logger.warn("FAILED doris stream load response:\n#{response}")
+
+            if @retry_count >= 0 && req_count > @retry_count
+               @logger.warn("DROP this batch after failed #{req_count} times.")
+               if @save_on_failure
+                  @logger.warn("Try save to disk.Disk file path : 
#{save_dir}/#{table}_#{save_file}")
+                  save_to_disk(documents)
+               end
+               break
+            end
 
-   def save_to_disk(documents)
-      begin
-         file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a")
-         file.write(documents)
-      rescue IOError => e
-         log_failure("An error occurred while saving file to disk: #{e}",
-         :file_name => file_name)
-      ensure
-         file.close unless file.nil?
+            # sleep and then retry
+            sleep_for = sleep_for * 2
+            sleep_for = sleep_for <= 60 ? sleep_for : 60
+            sleep_rand = (sleep_for / 2) + (rand(0..sleep_for) / 2)
+            @logger.warn("Will do retry #{req_count} after sleep #{sleep_rand} 
secs.")
+            sleep(sleep_rand)
+         end
       end
    end
 
-
    private
-
-   def make_request(documents, hosts, query, req_count = 1,host = "", uuid = 
SecureRandom.hex)
-
+   def make_request(documents, http_headers, hosts, query, host = "")
       if host == ""
          host = hosts.pop
       end
 
-      url = host+query
-      @logger.debug("req count: #{req_count}. get url: #{url}")
-      @logger.debug("request headers: ", @request_headers)
-      
+      url = host + query
 
-      result = RestClient.put(url, documents,@request_headers) { |response, 
request, result|
+      if @log_request or @logger.debug?
+         @logger.info("doris stream load request url: #{url}  headers: 
#{http_headers}  body size: #{documents.size}")
+      end
+      @logger.debug("doris stream load request body: #{documents}")
+
+      response = ""
+      begin
+         response = RestClient.put(url, documents, http_headers) { |response, 
request, result|
                 case response.code
                 when 301, 302, 307
                     @logger.debug("redirect to: 
#{response.headers[:location]}")
@@ -239,77 +252,78 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
                 else
                     response.return!
                 end
-            }
-
-      @logger.info("response : \n #{result}" )
-      result_body = JSON.parse(result.body)
-      if result_body['Status'] != "Success"
-         if req_count < @automatic_retries
-            @logger.warn("Response Status : #{result_body['Status']} . 
Retrying...... #{req_count}")
-            make_request(documents,hosts,query,req_count + 1,host,uuid)
-            return
-         end
-         @logger.warn("Load failed ! Try #{req_count} times.")
-         if @save_on_failure
-            @logger.warn("Retry times over #{req_count} times.Try save to 
disk.Disk file path : #{save_dir}/#{table}_#{save_file}")
-            save_to_disk(documents)
-         end
+         }
+      rescue => e
+         log_failure("doris stream load request error: #{e}")
       end
 
+      if @log_request or @logger.debug?
+         @logger.info("doris stream load response:\n#{response}")
+      end
+
+      return response
    end # def make_request
 
+   # Format the HTTP body
+   private
+   def event_body(event)
+      if @message_only
+         event.get("[message]")
+      else
+         LogStash::Json.dump(map_event(event))
+      end
+   end
+
+   private
+   def map_event(event)
+      if @mapping
+        # only get fields in mapping
+        convert_mapping(@mapping, event)
+      else
+        # get all fields
+        event.to_hash
+      end
+   end
+
+   private
+   def convert_mapping(mapping, event)
+      if mapping.is_a?(Hash)
+        mapping.reduce({}) do |acc, kv|
+          k, v = kv
+          acc[k] = convert_mapping(v, event)
+          acc
+        end
+      elsif mapping.is_a?(Array)
+        mapping.map { |elem| convert_mapping(elem, event) }
+      else
+        event.sprintf(mapping)
+      end
+   end
+
+   private
+   def save_to_disk(documents)
+      begin
+         file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a")
+         file.write(documents)
+      rescue IOError => e
+         log_failure("An error occurred while saving file to disk: #{e}",
+         :file_name => file_name)
+      ensure
+         file.close unless file.nil?
+      end
+   end
+
     # This is split into a separate method mostly to help testing
-   def log_failure(message, opts)
-      @logger.warn("[HTTP Output Failure] #{message}", opts)
+   def log_failure(message)
+      @logger.warn("[Doris Output Failure] #{message}")
    end
 
    def make_request_headers()
       headers = @headers || {}
       headers["Expect"] ||= "100-continue"
       headers["Content-Type"] ||= "text/plain;charset=utf-8"
-      headers["strict_mode"] ||= @strict_mode
       headers["Authorization"] = "Basic " + 
Base64.strict_encode64("#{user}:#{password.value}")
-      # column_separator
-      if @column_separator != ""
-         headers["column_separator"] = @column_separator
-      end
-      # timezone
-      if @timezone != ""
-           headers["timezone"] = @timezone
-      end
-      # partition
-      if @partition.size > 0
-           headers["partition"] ||= @partition
-      end
-      # where
-      if @where != ""
-           headers["where"] ||= @where
-      end
-      # timeout
-      if @timeout != -1
-           headers["timeout"] ||= @timeout
-      end
-      # max_filter_ratio
-      if @max_filter_ratio != -1
-           headers["max_filter_ratio"] ||= @max_filter_ratio
-      end
-      # exec_mem_limit
-      if @exec_mem_limit != -1
-           headers["exec_mem_limit"] ||= @exec_mem_limit
-      end
-      # columns
-      if @columns != ""
-          headers["columns"] ||= @columns
-      end
-      headers["format"] = @format if @format != ""
-      headers["jsonpaths"] = @jsonpaths if @jsonpaths != []
-      headers["json_root"] = @json_root if @json_root != ""
-      headers["fuzzy_parse"] = @fuzzy_parse if @fuzzy_parse != ""
-      headers["num_as_string"] = @num_as_string if @num_as_string != ""
-      headers["read_json_by_line"] = @read_json_by_line if @read_json_by_line 
!= ""
   
       headers
    end
 end # end of class LogStash::Outputs::Doris
-
-
diff --git a/extension/logstash/logstash-output-doris.gemspec 
b/extension/logstash/logstash-output-doris.gemspec
index ee456a0e42d..163ba260f07 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -18,8 +18,8 @@ under the License.
 =end
 Gem::Specification.new do |s|
   s.name            = 'logstash-output-doris'
-  s.version         = '0.2.0'
-  s.author          = 'wfjcmcb'
+  s.version         = '1.0.0'
+  s.author          = 'Apache Doris'
   s.email           = 'd...@doris.apache.org'
   s.homepage        = 'http://doris.apache.org'
   s.licenses        = ['Apache-2.0']


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to