joker-star-l commented on code in PR #48040: URL: https://github.com/apache/doris/pull/48040#discussion_r1960908439
########## extension/logstash/lib/logstash/outputs/doris.rb: ########## @@ -192,82 +243,111 @@ def sleep_for_attempt(attempt) (sleep_for/2) + (rand(0..sleep_for)/2) end + STAT_SUCCESS = 0 + STAT_FAIL = 1 + STAT_RETRY = 2 + private - def handle_request(documents, http_headers, event_num, req_count) - response = make_request(documents, http_headers, @http_query, @http_hosts.sample) - response_json = {} - begin - response_json = JSON.parse(response.body) - rescue => _ - @logger.warn("doris stream load response is not a valid JSON:\n#{response}") - end + def handle_request(table_events_map) + make_request(table_events_map) + retry_map = Hash.new + table_events_map.each do |table, table_events| + stat = STAT_SUCCESS + + if table == "" + @logger.warn("drop #{table_events.events_count} records because of empty table") + stat = STAT_FAIL + end - status = response_json["Status"] + response = "" + if stat == STAT_SUCCESS + begin + response = table_events.response_future.get.getBodyText + rescue => e + log_failure("doris stream load request error: #{e}") + stat = STAT_RETRY + end + end - need_retry = true + response_json = {} + if stat == STAT_SUCCESS + begin + response_json = JSON.parse(response) + rescue => _ + @logger.warn("doris stream load response is not a valid JSON:\n#{response}") + stat = STAT_RETRY + end + end - if status == 'Label Already Exists' - @logger.warn("Label already exists: #{response_json['Label']}, skip #{event_num} records:\n#{response}") - need_retry = false + if stat == STAT_SUCCESS + status = response_json["Status"] + + if status == 'Label Already Exists' + @logger.warn("Label already exists: #{response_json['Label']}, skip #{table_events.events_count} records:\n#{response}") + + elsif status == "Success" || status == "Publish Timeout" + @total_bytes.addAndGet(table_events.documents.size) + @total_rows.addAndGet(table_events.events_count) + if @log_request or @logger.debug? + @logger.info("doris stream load response:\n#{response}") + end + + else + @logger.warn("FAILED doris stream load response:\n#{response}") + if @max_retries >= 0 && table_events.req_count - 1 >= @max_retries + @logger.warn("DROP this batch after failed #{table_events.req_count} times.") + stat = STAT_FAIL + else + stat = STAT_RETRY + end + end + end - elsif status == "Success" || status == "Publish Timeout" - @total_bytes.addAndGet(documents.size) - @total_rows.addAndGet(event_num) - if @log_request or @logger.debug? - @logger.info("doris stream load response:\n#{response}") + if stat == STAT_FAIL && @save_on_failure + @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{table}_#{@save_file}") + save_to_disk(table_events.documents, table) end - need_retry = false - - elsif @max_retries >= 0 && req_count - 1 > @max_retries - @logger.warn("FAILED doris stream load response:\n#{response}") - @logger.warn("DROP this batch after failed #{req_count} times.") - if @save_on_failure - @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{@table}_#{@save_file}") - save_to_disk(documents) + + if stat != STAT_RETRY && table_events.req_count > 1 + @retry_queue_bytes.addAndGet(-table_events.documents.size) end - need_retry = false - end - if !need_retry - if req_count > 1 - @retry_queue_bytes.addAndGet(-documents.size) + if stat == STAT_RETRY + table_events.prepare_retry + retry_map[table] = table_events end - return end - # add to retry_queue - sleep_for = sleep_for_attempt(req_count) - @logger.warn("FAILED doris stream load response:\n#{response}") - @logger.warn("Will do the #{req_count}th retry after #{sleep_for} secs.") - delay_event = DelayEvent.new(sleep_for, [documents, http_headers, event_num, req_count+1]) - add_event_to_retry_queue(delay_event) + if retry_map.size > 0 + # add to retry_queue + req_count = retry_map.values[0].req_count Review Comment: Yes. Because they are in a same batch when receiving from the upper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org