xiaokang commented on code in PR #48040: URL: https://github.com/apache/doris/pull/48040#discussion_r1960782512
########## extension/logstash/lib/logstash/outputs/doris.rb: ########## @@ -89,8 +96,20 @@ def print_plugin_info() :http_hosts => @http_hosts) end + class DorisRedirectStrategy < Java::org.apache.hc.client5.http.impl.DefaultRedirectStrategy Review Comment: What's the difference between this class and DefaultRedirectStrategy? ########## extension/logstash/lib/logstash/outputs/doris.rb: ########## @@ -143,18 +162,24 @@ def register # retry queue size in bytes @retry_queue_bytes = java.util.concurrent.atomic.AtomicLong.new(0) retry_thread = Thread.new do - while popped = @retry_queue.take - documents, http_headers, event_num, req_count = popped.event - handle_request(documents, http_headers, event_num, req_count) + while (popped = @retry_queue.take) + table_events_map = popped.event + handle_request(table_events_map) end end - print_plugin_info() + @const_table = @table.index("%").nil? + + print_plugin_info end # def register private def add_event_to_retry_queue(delay_event) - event_size = delay_event.documents.size + event_size = 0 + delay_event.event.each do |_, es| Review Comment: table_events ########## extension/logstash/logstash-output-doris.gemspec: ########## @@ -18,7 +18,7 @@ under the License. =end Gem::Specification.new do |s| s.name = 'logstash-output-doris' - s.version = '1.1.0' + s.version = '1.1.1' Review Comment: feature added, so use version 1.2.0 ########## extension/logstash/lib/logstash/outputs/doris.rb: ########## @@ -169,21 +194,47 @@ def multi_receive(events) send_events(events) end + def create_http_headers(table) + http_headers = @request_headers.dup + unless @group_commit + # only set label if group_commit is off_mode or not set, since lable can not be used with group_commit + http_headers["label"] = @label_prefix + "_" + @db + "_" + table + "_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid) + end + http_headers + end + private def send_events(events) - documents = events.map { |event| event_body(event) }.join("\n") - event_num = events.size - - # @logger.info("get event num: #{event_num}") - @logger.debug("get documents: #{documents}") + table_events_map = Hash.new + if @const_table + table_events = TableEvents.new(@table, create_http_headers(@table)) + table_events.events = events + table_events_map[@table] = table_events + else + events.each do |event| + table = event.sprintf(@table) + if table == "" || !table.index("%").nil? + table = @default_table + if table == "" + @logger.warn("table format error, the default table is not set, the data will be dropped") + else + @logger.warn("table format error, use the default table: #{table}") + end + end + table_events = table_events_map[table] + if table_events == nil + table_events = TableEvents.new(table, create_http_headers(table)) + table_events_map[table] = table_events + end + table_events.events << event + end + end - http_headers = @request_headers.dup - if !@group_commit - # only set label if group_commit is off_mode or not set, since lable can not be used with group_commit - http_headers["label"] = @label_prefix + "_" + @db + "_" + @table + "_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid) + table_events_map.each do |_, es| Review Comment: The name `es` is ambiguous. Use `table_events` instead. ########## extension/logstash/lib/logstash/outputs/doris.rb: ########## @@ -169,21 +194,47 @@ def multi_receive(events) send_events(events) end + def create_http_headers(table) + http_headers = @request_headers.dup + unless @group_commit Review Comment: if !@group_commit ########## extension/logstash/lib/logstash/outputs/doris.rb: ########## @@ -27,11 +27,16 @@ require "securerandom" require "json" require "base64" -require "restclient" require 'thread' +require 'java' +require Dir["#{File.dirname(__FILE__)}/../../*_jars.rb"].first Review Comment: What's this? ########## extension/logstash/lib/logstash/outputs/doris.rb: ########## @@ -192,82 +243,111 @@ def sleep_for_attempt(attempt) (sleep_for/2) + (rand(0..sleep_for)/2) end + STAT_SUCCESS = 0 + STAT_FAIL = 1 + STAT_RETRY = 2 + private - def handle_request(documents, http_headers, event_num, req_count) - response = make_request(documents, http_headers, @http_query, @http_hosts.sample) - response_json = {} - begin - response_json = JSON.parse(response.body) - rescue => _ - @logger.warn("doris stream load response is not a valid JSON:\n#{response}") - end + def handle_request(table_events_map) + make_request(table_events_map) + retry_map = Hash.new + table_events_map.each do |table, table_events| + stat = STAT_SUCCESS + + if table == "" + @logger.warn("drop #{table_events.events_count} records because of empty table") + stat = STAT_FAIL + end - status = response_json["Status"] + response = "" + if stat == STAT_SUCCESS + begin + response = table_events.response_future.get.getBodyText + rescue => e + log_failure("doris stream load request error: #{e}") + stat = STAT_RETRY + end + end - need_retry = true + response_json = {} + if stat == STAT_SUCCESS + begin + response_json = JSON.parse(response) + rescue => _ + @logger.warn("doris stream load response is not a valid JSON:\n#{response}") + stat = STAT_RETRY + end + end - if status == 'Label Already Exists' - @logger.warn("Label already exists: #{response_json['Label']}, skip #{event_num} records:\n#{response}") - need_retry = false + if stat == STAT_SUCCESS + status = response_json["Status"] + + if status == 'Label Already Exists' + @logger.warn("Label already exists: #{response_json['Label']}, skip #{table_events.events_count} records:\n#{response}") + + elsif status == "Success" || status == "Publish Timeout" + @total_bytes.addAndGet(table_events.documents.size) + @total_rows.addAndGet(table_events.events_count) + if @log_request or @logger.debug? + @logger.info("doris stream load response:\n#{response}") + end + + else + @logger.warn("FAILED doris stream load response:\n#{response}") + if @max_retries >= 0 && table_events.req_count - 1 >= @max_retries + @logger.warn("DROP this batch after failed #{table_events.req_count} times.") + stat = STAT_FAIL + else + stat = STAT_RETRY + end + end + end - elsif status == "Success" || status == "Publish Timeout" - @total_bytes.addAndGet(documents.size) - @total_rows.addAndGet(event_num) - if @log_request or @logger.debug? - @logger.info("doris stream load response:\n#{response}") + if stat == STAT_FAIL && @save_on_failure + @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{table}_#{@save_file}") + save_to_disk(table_events.documents, table) end - need_retry = false - - elsif @max_retries >= 0 && req_count - 1 > @max_retries - @logger.warn("FAILED doris stream load response:\n#{response}") - @logger.warn("DROP this batch after failed #{req_count} times.") - if @save_on_failure - @logger.warn("Try save to disk.Disk file path : #{@save_dir}/#{@table}_#{@save_file}") - save_to_disk(documents) + + if stat != STAT_RETRY && table_events.req_count > 1 + @retry_queue_bytes.addAndGet(-table_events.documents.size) end - need_retry = false - end - if !need_retry - if req_count > 1 - @retry_queue_bytes.addAndGet(-documents.size) + if stat == STAT_RETRY + table_events.prepare_retry + retry_map[table] = table_events end - return end - # add to retry_queue - sleep_for = sleep_for_attempt(req_count) - @logger.warn("FAILED doris stream load response:\n#{response}") - @logger.warn("Will do the #{req_count}th retry after #{sleep_for} secs.") - delay_event = DelayEvent.new(sleep_for, [documents, http_headers, event_num, req_count+1]) - add_event_to_retry_queue(delay_event) + if retry_map.size > 0 + # add to retry_queue + req_count = retry_map.values[0].req_count Review Comment: Is it safe to use values[0].req_count to represent all values's req_count? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org