xiaokang commented on code in PR #48040:
URL: https://github.com/apache/doris/pull/48040#discussion_r1960782512


##########
extension/logstash/lib/logstash/outputs/doris.rb:
##########
@@ -89,8 +96,20 @@ def print_plugin_info()
       :http_hosts => @http_hosts)
    end
 
+   class DorisRedirectStrategy < 
Java::org.apache.hc.client5.http.impl.DefaultRedirectStrategy

Review Comment:
   What's the difference between this class and DefaultRedirectStrategy?



##########
extension/logstash/lib/logstash/outputs/doris.rb:
##########
@@ -143,18 +162,24 @@ def register
       # retry queue size in bytes
       @retry_queue_bytes = java.util.concurrent.atomic.AtomicLong.new(0)
       retry_thread = Thread.new do
-         while popped = @retry_queue.take
-            documents, http_headers, event_num, req_count = popped.event
-            handle_request(documents, http_headers, event_num, req_count)
+         while (popped = @retry_queue.take)
+            table_events_map = popped.event
+            handle_request(table_events_map)
          end
       end
 
-      print_plugin_info()
+      @const_table = @table.index("%").nil?
+
+      print_plugin_info
    end # def register
 
    private
    def add_event_to_retry_queue(delay_event)
-      event_size = delay_event.documents.size
+      event_size = 0
+      delay_event.event.each do |_, es|

Review Comment:
   table_events



##########
extension/logstash/logstash-output-doris.gemspec:
##########
@@ -18,7 +18,7 @@ under the License.
 =end
 Gem::Specification.new do |s|
   s.name            = 'logstash-output-doris'
-  s.version         = '1.1.0'
+  s.version         = '1.1.1'

Review Comment:
   feature added, so use version 1.2.0 



##########
extension/logstash/lib/logstash/outputs/doris.rb:
##########
@@ -169,21 +194,47 @@ def multi_receive(events)
       send_events(events)
    end
 
+   def create_http_headers(table)
+      http_headers = @request_headers.dup
+      unless @group_commit
+         # only set label if group_commit is off_mode or not set, since lable 
can not be used with group_commit
+         http_headers["label"] = @label_prefix + "_" + @db + "_" + table + "_" 
+ Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+      end
+      http_headers
+   end
+
    private
    def send_events(events)
-      documents = events.map { |event| event_body(event) }.join("\n")
-      event_num = events.size
-
-      # @logger.info("get event num: #{event_num}")
-      @logger.debug("get documents: #{documents}")
+      table_events_map = Hash.new
+      if @const_table
+         table_events = TableEvents.new(@table, create_http_headers(@table))
+         table_events.events = events
+         table_events_map[@table] = table_events
+      else
+         events.each do |event|
+            table = event.sprintf(@table)
+            if table == "" || !table.index("%").nil?
+               table = @default_table
+               if table == ""
+                  @logger.warn("table format error, the default table is not 
set, the data will be dropped")
+               else
+                  @logger.warn("table format error, use the default table: 
#{table}")
+               end
+            end
+            table_events = table_events_map[table]
+            if table_events == nil
+               table_events = TableEvents.new(table, 
create_http_headers(table))
+               table_events_map[table] = table_events
+            end
+            table_events.events << event
+         end
+      end
 
-      http_headers = @request_headers.dup
-      if !@group_commit
-         # only set label if group_commit is off_mode or not set, since lable 
can not be used with group_commit
-         http_headers["label"] = @label_prefix + "_" + @db + "_" + @table + 
"_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+      table_events_map.each do |_, es|

Review Comment:
   The name `es` is ambiguous. Use `table_events` instead.



##########
extension/logstash/lib/logstash/outputs/doris.rb:
##########
@@ -169,21 +194,47 @@ def multi_receive(events)
       send_events(events)
    end
 
+   def create_http_headers(table)
+      http_headers = @request_headers.dup
+      unless @group_commit

Review Comment:
   if !@group_commit



##########
extension/logstash/lib/logstash/outputs/doris.rb:
##########
@@ -27,11 +27,16 @@
 require "securerandom"
 require "json"
 require "base64"
-require "restclient"
 require 'thread'
 
+require 'java'
+require Dir["#{File.dirname(__FILE__)}/../../*_jars.rb"].first

Review Comment:
   What's this?



##########
extension/logstash/lib/logstash/outputs/doris.rb:
##########
@@ -192,82 +243,111 @@ def sleep_for_attempt(attempt)
       (sleep_for/2) + (rand(0..sleep_for)/2)
    end
 
+   STAT_SUCCESS = 0
+   STAT_FAIL = 1
+   STAT_RETRY = 2
+
    private
-   def handle_request(documents, http_headers, event_num, req_count)
-      response = make_request(documents, http_headers, @http_query, 
@http_hosts.sample)
-      response_json = {}
-      begin
-         response_json = JSON.parse(response.body)
-      rescue => _
-         @logger.warn("doris stream load response is not a valid 
JSON:\n#{response}")
-      end
+   def handle_request(table_events_map)
+      make_request(table_events_map)
+      retry_map = Hash.new
+      table_events_map.each do |table, table_events|
+         stat = STAT_SUCCESS
+
+         if table == ""
+            @logger.warn("drop #{table_events.events_count} records because of 
empty table")
+            stat = STAT_FAIL
+         end
 
-      status = response_json["Status"]
+         response = ""
+         if stat == STAT_SUCCESS
+            begin
+               response = table_events.response_future.get.getBodyText
+            rescue => e
+               log_failure("doris stream load request error: #{e}")
+               stat = STAT_RETRY
+            end
+         end
 
-      need_retry = true
+         response_json = {}
+         if stat == STAT_SUCCESS
+            begin
+               response_json = JSON.parse(response)
+            rescue => _
+               @logger.warn("doris stream load response is not a valid 
JSON:\n#{response}")
+               stat = STAT_RETRY
+            end
+         end
 
-      if status == 'Label Already Exists'
-         @logger.warn("Label already exists: #{response_json['Label']}, skip 
#{event_num} records:\n#{response}")
-         need_retry = false
+         if stat == STAT_SUCCESS
+            status = response_json["Status"]
+
+            if status == 'Label Already Exists'
+               @logger.warn("Label already exists: #{response_json['Label']}, 
skip #{table_events.events_count} records:\n#{response}")
+
+            elsif status == "Success" || status == "Publish Timeout"
+               @total_bytes.addAndGet(table_events.documents.size)
+               @total_rows.addAndGet(table_events.events_count)
+               if @log_request or @logger.debug?
+                  @logger.info("doris stream load response:\n#{response}")
+               end
+
+            else
+               @logger.warn("FAILED doris stream load response:\n#{response}")
+               if @max_retries >= 0 && table_events.req_count - 1 >= 
@max_retries
+                  @logger.warn("DROP this batch after failed 
#{table_events.req_count} times.")
+                  stat = STAT_FAIL
+               else
+                  stat = STAT_RETRY
+               end
+            end
+         end
 
-      elsif status == "Success" || status == "Publish Timeout"
-         @total_bytes.addAndGet(documents.size)
-         @total_rows.addAndGet(event_num)
-         if @log_request or @logger.debug?
-            @logger.info("doris stream load response:\n#{response}")
+         if stat == STAT_FAIL && @save_on_failure
+            @logger.warn("Try save to disk.Disk file path : 
#{@save_dir}/#{table}_#{@save_file}")
+            save_to_disk(table_events.documents, table)
          end
-         need_retry = false
-
-      elsif @max_retries >= 0 && req_count - 1 > @max_retries
-         @logger.warn("FAILED doris stream load response:\n#{response}")
-         @logger.warn("DROP this batch after failed #{req_count} times.")
-         if @save_on_failure
-            @logger.warn("Try save to disk.Disk file path : 
#{@save_dir}/#{@table}_#{@save_file}")
-            save_to_disk(documents)
+
+         if stat != STAT_RETRY && table_events.req_count > 1
+            @retry_queue_bytes.addAndGet(-table_events.documents.size)
          end
-         need_retry = false
-      end
 
-      if !need_retry
-         if req_count > 1
-            @retry_queue_bytes.addAndGet(-documents.size)
+         if stat == STAT_RETRY
+            table_events.prepare_retry
+            retry_map[table] = table_events
          end
-         return
       end
 
-      # add to retry_queue
-      sleep_for = sleep_for_attempt(req_count)
-      @logger.warn("FAILED doris stream load response:\n#{response}")
-      @logger.warn("Will do the #{req_count}th retry after #{sleep_for} secs.")
-      delay_event = DelayEvent.new(sleep_for, [documents, http_headers, 
event_num, req_count+1])
-      add_event_to_retry_queue(delay_event)
+      if retry_map.size > 0
+         # add to retry_queue
+         req_count = retry_map.values[0].req_count

Review Comment:
   Is it safe to use values[0].req_count to represent all values's req_count?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to