joker-star-l commented on code in PR #48040:
URL: https://github.com/apache/doris/pull/48040#discussion_r1960908439


##########
extension/logstash/lib/logstash/outputs/doris.rb:
##########
@@ -192,82 +243,111 @@ def sleep_for_attempt(attempt)
       (sleep_for/2) + (rand(0..sleep_for)/2)
    end
 
+   STAT_SUCCESS = 0
+   STAT_FAIL = 1
+   STAT_RETRY = 2
+
    private
-   def handle_request(documents, http_headers, event_num, req_count)
-      response = make_request(documents, http_headers, @http_query, 
@http_hosts.sample)
-      response_json = {}
-      begin
-         response_json = JSON.parse(response.body)
-      rescue => _
-         @logger.warn("doris stream load response is not a valid 
JSON:\n#{response}")
-      end
+   def handle_request(table_events_map)
+      make_request(table_events_map)
+      retry_map = Hash.new
+      table_events_map.each do |table, table_events|
+         stat = STAT_SUCCESS
+
+         if table == ""
+            @logger.warn("drop #{table_events.events_count} records because of 
empty table")
+            stat = STAT_FAIL
+         end
 
-      status = response_json["Status"]
+         response = ""
+         if stat == STAT_SUCCESS
+            begin
+               response = table_events.response_future.get.getBodyText
+            rescue => e
+               log_failure("doris stream load request error: #{e}")
+               stat = STAT_RETRY
+            end
+         end
 
-      need_retry = true
+         response_json = {}
+         if stat == STAT_SUCCESS
+            begin
+               response_json = JSON.parse(response)
+            rescue => _
+               @logger.warn("doris stream load response is not a valid 
JSON:\n#{response}")
+               stat = STAT_RETRY
+            end
+         end
 
-      if status == 'Label Already Exists'
-         @logger.warn("Label already exists: #{response_json['Label']}, skip 
#{event_num} records:\n#{response}")
-         need_retry = false
+         if stat == STAT_SUCCESS
+            status = response_json["Status"]
+
+            if status == 'Label Already Exists'
+               @logger.warn("Label already exists: #{response_json['Label']}, 
skip #{table_events.events_count} records:\n#{response}")
+
+            elsif status == "Success" || status == "Publish Timeout"
+               @total_bytes.addAndGet(table_events.documents.size)
+               @total_rows.addAndGet(table_events.events_count)
+               if @log_request or @logger.debug?
+                  @logger.info("doris stream load response:\n#{response}")
+               end
+
+            else
+               @logger.warn("FAILED doris stream load response:\n#{response}")
+               if @max_retries >= 0 && table_events.req_count - 1 >= 
@max_retries
+                  @logger.warn("DROP this batch after failed 
#{table_events.req_count} times.")
+                  stat = STAT_FAIL
+               else
+                  stat = STAT_RETRY
+               end
+            end
+         end
 
-      elsif status == "Success" || status == "Publish Timeout"
-         @total_bytes.addAndGet(documents.size)
-         @total_rows.addAndGet(event_num)
-         if @log_request or @logger.debug?
-            @logger.info("doris stream load response:\n#{response}")
+         if stat == STAT_FAIL && @save_on_failure
+            @logger.warn("Try save to disk.Disk file path : 
#{@save_dir}/#{table}_#{@save_file}")
+            save_to_disk(table_events.documents, table)
          end
-         need_retry = false
-
-      elsif @max_retries >= 0 && req_count - 1 > @max_retries
-         @logger.warn("FAILED doris stream load response:\n#{response}")
-         @logger.warn("DROP this batch after failed #{req_count} times.")
-         if @save_on_failure
-            @logger.warn("Try save to disk.Disk file path : 
#{@save_dir}/#{@table}_#{@save_file}")
-            save_to_disk(documents)
+
+         if stat != STAT_RETRY && table_events.req_count > 1
+            @retry_queue_bytes.addAndGet(-table_events.documents.size)
          end
-         need_retry = false
-      end
 
-      if !need_retry
-         if req_count > 1
-            @retry_queue_bytes.addAndGet(-documents.size)
+         if stat == STAT_RETRY
+            table_events.prepare_retry
+            retry_map[table] = table_events
          end
-         return
       end
 
-      # add to retry_queue
-      sleep_for = sleep_for_attempt(req_count)
-      @logger.warn("FAILED doris stream load response:\n#{response}")
-      @logger.warn("Will do the #{req_count}th retry after #{sleep_for} secs.")
-      delay_event = DelayEvent.new(sleep_for, [documents, http_headers, 
event_num, req_count+1])
-      add_event_to_retry_queue(delay_event)
+      if retry_map.size > 0
+         # add to retry_queue
+         req_count = retry_map.values[0].req_count

Review Comment:
   Yes. Because they are in a same batch when receiving from the upper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to