This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cefccaf0e56 [enhancement](plugin) logstash: add retry queue without 
blocking tasks (#44999)
cefccaf0e56 is described below

commit cefccaf0e56a68bbc5f5c3a6672fe7db50085c5d
Author: Mingxi <71588583+joker-sta...@users.noreply.github.com>
AuthorDate: Mon Feb 17 08:14:45 2025 +0800

    [enhancement](plugin) logstash: add retry queue without blocking tasks 
(#44999)
---
 extension/logstash/lib/logstash/outputs/doris.rb   | 140 +++++++++++++--------
 .../logstash/lib/logstash/util/delay_event.rb      |  54 ++++++++
 extension/logstash/logstash-output-doris.gemspec   |   2 +-
 3 files changed, 143 insertions(+), 53 deletions(-)

diff --git a/extension/logstash/lib/logstash/outputs/doris.rb 
b/extension/logstash/lib/logstash/outputs/doris.rb
index 21d3ee6e752..971d28889c4 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -22,6 +22,7 @@ require "logstash/outputs/base"
 require "logstash/namespace"
 require "logstash/json"
 require 'logstash/util/formater'
+require 'logstash/util/delay_event'
 require "uri"
 require "securerandom"
 require "json"
@@ -43,7 +44,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
    config :db, :validate => :string, :required => true
    # the table which data is loaded to
    config :table, :validate => :string, :required => true
-   # label prefix of a stream load requst.
+   # label prefix of a stream load request.
    config :label_prefix, :validate => :string, :default => "logstash"
    # user name
    config :user, :validate => :string, :required => true
@@ -72,6 +73,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
 
    config :log_progress_interval, :validate => :number, :default => 10
 
+   # max retry queue size in MB, default is 20% max memory of JVM
+   config :max_retry_queue_mb, :validate => :number, :default => 
java.lang.Runtime.get_runtime.max_memory / 1024 / 1024 / 5
 
    def print_plugin_info()
       @plugins = Gem::Specification.find_all{|spec| spec.name =~ 
/logstash-output-doris/ }
@@ -131,9 +134,36 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
          end
       end
 
+      if @max_retry_queue_mb <= 0
+         @max_retry_queue_mb = java.lang.Runtime.get_runtime.max_memory / 1024 
/ 1024 / 5
+      end
+      @logger.info("max retry queue size: #{@max_retry_queue_mb}MB")
+
+      @retry_queue = java.util.concurrent.DelayQueue.new
+      # retry queue size in bytes
+      @retry_queue_bytes = java.util.concurrent.atomic.AtomicLong.new(0)
+      retry_thread = Thread.new do
+         while popped = @retry_queue.take
+            documents, http_headers, event_num, req_count = popped.event
+            handle_request(documents, http_headers, event_num, req_count)
+         end
+      end
+
       print_plugin_info()
    end # def register
 
+   private
+   def add_event_to_retry_queue(delay_event)
+      event_size = delay_event.documents.size
+      if delay_event.first_retry
+         while @retry_queue_bytes.get + event_size > @max_retry_queue_mb * 
1024 * 1024
+            sleep(1)
+         end
+         @retry_queue_bytes.addAndGet(event_size)
+      end
+      @retry_queue.add(delay_event)
+   end
+
    def multi_receive(events)
       return if events.empty?
       send_events(events)
@@ -141,12 +171,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
 
    private
    def send_events(events)
-      documents = ""
-      event_num = 0
-      events.each do |event|
-         documents << event_body(event) << "\n"
-         event_num += 1
-      end
+      documents = events.map { |event| event_body(event) }.join("\n")
+      event_num = events.size
 
       # @logger.info("get event num: #{event_num}")
       @logger.debug("get documents: #{documents}")
@@ -157,50 +183,64 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
          http_headers["label"] = @label_prefix + "_" + @db + "_" + @table + 
"_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
       end
 
-      req_count = 0
-      sleep_for = 1
-      while true
-         response = make_request(documents, http_headers, @http_query, 
@http_hosts.sample)
-
-         req_count += 1
-         response_json = {}
-         begin
-            response_json = JSON.parse(response.body)
-         rescue => e
-            @logger.warn("doris stream load response: #{response} is not a 
valid JSON")
-         end
+      handle_request(documents, http_headers, event_num, 1)
+   end
+
+   def sleep_for_attempt(attempt)
+      sleep_for = attempt**2
+      sleep_for = sleep_for <= 60 ? sleep_for : 60
+      (sleep_for/2) + (rand(0..sleep_for)/2)
+   end
+
+   private
+   def handle_request(documents, http_headers, event_num, req_count)
+      response = make_request(documents, http_headers, @http_query, 
@http_hosts.sample)
+      response_json = {}
+      begin
+         response_json = JSON.parse(response.body)
+      rescue => _
+         @logger.warn("doris stream load response is not a valid 
JSON:\n#{response}")
+      end
+
+      status = response_json["Status"]
+
+      need_retry = true
 
-         status = response_json["Status"]
+      if status == 'Label Already Exists'
+         @logger.warn("Label already exists: #{response_json['Label']}, skip 
#{event_num} records:\n#{response}")
+         need_retry = false
 
-         if status == 'Label Already Exists'
-           @logger.warn("Label already exists: #{response_json['Label']}, skip 
#{event_num} records.")
-           break
+      elsif status == "Success" || status == "Publish Timeout"
+         @total_bytes.addAndGet(documents.size)
+         @total_rows.addAndGet(event_num)
+         if @log_request or @logger.debug?
+            @logger.info("doris stream load response:\n#{response}")
          end
+         need_retry = false
+
+      elsif @max_retries >= 0 && req_count - 1 > @max_retries
+         @logger.warn("FAILED doris stream load response:\n#{response}")
+         @logger.warn("DROP this batch after failed #{req_count} times.")
+         if @save_on_failure
+            @logger.warn("Try save to disk.Disk file path : 
#{@save_dir}/#{@table}_#{@save_file}")
+            save_to_disk(documents)
+         end
+         need_retry = false
+      end
 
-         if status == "Success" || status == "Publish Timeout"
-            @total_bytes.addAndGet(documents.size)
-            @total_rows.addAndGet(event_num)
-            break
-         else
-            @logger.warn("FAILED doris stream load response:\n#{response}")
-
-            if @max_retries >= 0 && req_count > @max_retries
-               @logger.warn("DROP this batch after failed #{req_count} times.")
-               if @save_on_failure
-                  @logger.warn("Try save to disk.Disk file path : 
#{@save_dir}/#{@table}_#{@save_file}")
-                  save_to_disk(documents)
-               end
-               break
-            end
-
-            # sleep and then retry
-            sleep_for = sleep_for * 2
-            sleep_for = sleep_for <= 60 ? sleep_for : 60
-            sleep_rand = (sleep_for / 2) + (rand(0..sleep_for) / 2)
-            @logger.warn("Will do retry #{req_count} after sleep #{sleep_rand} 
secs.")
-            sleep(sleep_rand)
+      if !need_retry
+         if req_count > 1
+            @retry_queue_bytes.addAndGet(-documents.size)
          end
+         return
       end
+
+      # add to retry_queue
+      sleep_for = sleep_for_attempt(req_count)
+      @logger.warn("FAILED doris stream load response:\n#{response}")
+      @logger.warn("Will do the #{req_count}th retry after #{sleep_for} secs.")
+      delay_event = DelayEvent.new(sleep_for, [documents, http_headers, 
event_num, req_count+1])
+      add_event_to_retry_queue(delay_event)
    end
 
    private
@@ -227,11 +267,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
          log_failure("doris stream load request error: #{e}")
       end
 
-      if @log_request or @logger.debug?
-         @logger.info("doris stream load response:\n#{response}")
-      end
-
-      return response
+      response
    end # def make_request
 
    # Format the HTTP body
@@ -284,8 +320,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
    end
 
     # This is split into a separate method mostly to help testing
-   def log_failure(message)
-      @logger.warn("[Doris Output Failure] #{message}")
+   def log_failure(message, data = {})
+      @logger.warn("[Doris Output Failure] #{message}", data)
    end
 
    def make_request_headers()
diff --git a/extension/logstash/lib/logstash/util/delay_event.rb 
b/extension/logstash/lib/logstash/util/delay_event.rb
new file mode 100644
index 00000000000..86f59ef457f
--- /dev/null
+++ b/extension/logstash/lib/logstash/util/delay_event.rb
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require 'java'
+
+class DelayEvent
+   include java.util.concurrent.Delayed
+
+   def initialize(delay, event)
+      @start_time = Time.now.to_i + delay
+      @event = event # event style: [documents, http_headers, event_num, 
req_count]
+   end
+
+   def get_delay(unit)
+      delay = @start_time - Time.now.to_i
+      unit.convert(delay, java.util.concurrent.TimeUnit::SECONDS)
+   end
+
+   def compare_to(other)
+      d = self.start_time - other.start_time
+      return 0 if d == 0
+      d < 0 ? -1 : 1
+   end
+
+   def start_time
+      @start_time
+   end
+
+   def event
+      @event
+   end
+
+   def documents
+      @event[0]
+   end
+
+   def first_retry
+      @event[3] == 2
+   end
+end
diff --git a/extension/logstash/logstash-output-doris.gemspec 
b/extension/logstash/logstash-output-doris.gemspec
index 30341b83156..f44d57d0511 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -18,7 +18,7 @@ under the License.
 =end
 Gem::Specification.new do |s|
   s.name            = 'logstash-output-doris'
-  s.version         = '1.0.1'
+  s.version         = '1.1.0'
   s.author          = 'Apache Doris'
   s.email           = 'd...@doris.apache.org'
   s.homepage        = 'http://doris.apache.org'


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to