This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 86d235a [Extension] Logstash Doris output plugin (#3800) 86d235a is described below commit 86d235a76a4dddbf8de51301e790d2cf9faf4aed Author: wfjcmcb <33599943+wfjc...@users.noreply.github.com> AuthorDate: Thu Jun 11 08:54:51 2020 +0800 [Extension] Logstash Doris output plugin (#3800) This plugin is used to output data to Doris for logstash Use the HTTP protocol to interact with the Doris FE Http interface Load data through Doris's stream load --- docs/.vuepress/sidebar/en.js | 1 + docs/.vuepress/sidebar/zh-CN.js | 1 + docs/en/extending-doris/logstash.md | 198 ++++++++++++++ docs/zh-CN/extending-doris/logstash.md | 198 ++++++++++++++ extension/logstash/Gemfile | 20 ++ extension/logstash/LICENSE | 16 ++ extension/logstash/README.md | 28 ++ extension/logstash/Rakefile | 25 ++ extension/logstash/lib/logstash/outputs/doris.rb | 294 +++++++++++++++++++++ .../lib/logstash/util/shortname_resolver.rb | 58 ++++ extension/logstash/logstash-output-doris.gemspec | 47 ++++ 11 files changed, 886 insertions(+) diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index c99defd..7d5fefa 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -126,6 +126,7 @@ module.exports = [ "plugin-development-manual", "user-defined-function", "spark-doris-connector", + "logstash", ], }, { diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index 9e0ffcc..e1d1508 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -137,6 +137,7 @@ module.exports = [ "plugin-development-manual", "user-defined-function", "spark-doris-connector", + "logstash", ], }, { diff --git a/docs/en/extending-doris/logstash.md b/docs/en/extending-doris/logstash.md new file mode 100644 index 0000000..75d5647 --- /dev/null +++ b/docs/en/extending-doris/logstash.md @@ -0,0 +1,198 @@ +--- +{ + "title": "Logstash Doris Output Plugin", + "language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Doris output plugin + +This plugin is used to output data to Doris for logstash, use the HTTP protocol to interact with the Doris FE Http interface, and import data through Doris's stream load. + +[Learn more about Doris Stream Load ](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html) + +[Learn more about Doris](http://doris.apache.org/master/zh-CN/) + + +## Install and compile +### 1.Download source code + +### 2.compile ## +Execute under extension/logstash/ directory + +`gem build logstash-output-doris.gemspec` + +You will get logstash-output-doris-{version}.gem file in the same directory + +### 3.Plug-in installation +copy logstash-output-doris-{version}.gem to the logstash installation directory + +Excuting an order + +`./bin/logstash-plugin install logstash-output-doris-{version}.gem` + +Install logstash-output-doris plugin + +## Configuration +### Example: + +Create a new configuration file in the config directory and name it logstash-doris.conf + +The specific configuration is as follows: + + output { + doris { + http_hosts => [ "http://fehost:8030" ] + user => user_name + password => password + db => "db_name" + table => "table_name" + label_prefix => "label_prefix" + column_separator => "," + } + } + +Configuration instructions: + +Connection configuration: + +Configuration | Explanation +--- | --- +`http_hosts` | FE's HTTP interactive address eg | ["http://fe1:8030", "http://fe2:8030"] +`user` | User name, the user needs to have import permission for the doris table +`password` | Password +`db` | Database name +`table` | Table name +`label_prefix` | Import the identification prefix, the final generated ID is *{label\_prefix}\_{db}\_{table}\_{time_stamp}* + + +Load configuration:([Reference documents](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)) + +Configuration | Explanation +--- | --- +`column_separator` | Column separator, the default is \t +`columns` | Used to specify the correspondence between the columns in the import file and the columns in the table +`where` | The filter conditions specified by the import task +`max_filter_ratio` | The maximum tolerance rate of the import task, the default is zero tolerance +`partition` | Partition information of the table to be imported +`timeout` | timeout, the default is 600s +`strict_mode` | Strict mode, the default is false +`timezone` | Specify the time zone used for this import, the default is the East Eight District +`exec_mem_limit` | Import memory limit, default is 2GB, unit is byte + +Other configuration: + +Configuration | Explanation +--- | --- +`save_on_failure` | If the import fails to save locally, the default is true +`save_dir` | Local save directory, default is /tmp +`automatic_retries` | The maximum number of retries on failure, the default is 3 +`batch_size` | The maximum number of events processed per batch, the default is 100000 +`idle_flush_time` | Maximum interval, the default is 20 (seconds) + + +## Start Up +Run the command to start the doris output plugin: + +`{logstash-home}/bin/logstash -f {logstash-home}/config/logstash-doris.conf --config.reload.automatic` + + + + +## Complete usage example +### 1. Compile doris-output-plugin +1> Download the ruby compressed package and go to [ruby official website](https://www.ruby-lang.org/en/downloads/) to download it. The version 2.7.1 used here + +2> Compile and install, configure ruby environment variables + +3> Go to the doris source extension/logstash/ directory and execute + +`gem build logstash-output-doris.gemspec` + +Get the file logstash-output-doris-0.1.0.gem, and the compilation is complete + +### 2. Install and configure filebeat (here use filebeat as input) + +1> [es official website](https://www.elastic.co/) Download the filebeat tar compression package and decompress it + +2> Enter the filebeat directory and modify the configuration file filebeat.yml as follows: + + filebeat.inputs: + - type: log + paths: + - /tmp/doris.data + output.logstash: + hosts: ["localhost:5044"] + +/tmp/doris.data is the doris data path + +3> Start filebeat: + +`./filebeat -e -c filebeat.yml -d "publish"` + + +### 3.Install logstash and doris-out-plugin +1> [es official website](https://www.elastic.co/) Download the logstash tar compressed package and decompress it + +2> Copy the logstash-output-doris-0.1.0.gem obtained in step 1 to the logstash installation directory + +3> execute + +`./bin/logstash-plugin install logstash-output-doris-0.1.0.gem` + +Install the plugin + +4> Create a new configuration file logstash-doris.conf in the config directory as follows: + + input { + beats { + port => "5044" + } + } + + output { + doris { + http_hosts => [ "http://127.0.0.1:8030" ] + user => doris + password => doris + db => "logstash_output_test" + table => "output" + label_prefix => "doris" + column_separator => "," + columns => "a,b,c,d,e" + } + } + +The configuration here needs to be configured according to the configuration instructions + +5> Start logstash: + +./bin/logstash -f ./config/logstash-doris.conf --config.reload.automatic + +### 4.Test Load + +Add write data to /tmp/doris.data + +`echo a,b,c,d,e >> /tmp/doris.data` + +Observe the logstash log. If the status of the returned response is Success, the import was successful. At this time, you can view the imported data in the logstash_output_test.output table + diff --git a/docs/zh-CN/extending-doris/logstash.md b/docs/zh-CN/extending-doris/logstash.md new file mode 100644 index 0000000..467a886 --- /dev/null +++ b/docs/zh-CN/extending-doris/logstash.md @@ -0,0 +1,198 @@ +--- +{ + "title": "Logstash Doris Output Plugin", + "language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Doris output plugin + +该插件用于logstash输出数据到Doris,使用 HTTP 协议与 Doris FE Http接口交互,并通过 Doris 的 stream load 的方式进行数据导入. + +[了解Doris Stream Load ](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html) + +[了解更多关于Doris](http://doris.apache.org/master/zh-CN/) + + +## 安装和编译 +### 1.下载插件源码 + +### 2.编译 ## +在extension/logstash/ 目录下执行 + +`gem build logstash-output-doris.gemspec` + +你将在同目录下得到 logstash-output-doris-{version}.gem 文件 + +### 3.插件安装 +copy logstash-output-doris-{version}.gem 到 logstash 安装目录下 + +执行命令 + +`./bin/logstash-plugin install logstash-output-doris-{version}.gem` + +安装 logstash-output-doris 插件 + +## 配置 +### 示例: + +在config目录下新建一个配置配置文件,命名为 logstash-doris.conf + +具体配置如下: + + output { + doris { + http_hosts => [ "http://fehost:8030" ] + user => user_name + password => password + db => "db_name" + table => "table_name" + label_prefix => "label_prefix" + column_separator => "," + } + } + +配置说明: + +连接相关配置: + +配置 | 说明 +--- | --- +`http_hosts` | FE的HTTP交互地址 eg | ["http://fe1:8030", "http://fe2:8030"] +`user` | 用户名,该用户需要有doris对应库表的导入权限 +`password` | 密码 +`db` | 数据库名 +`table` | 表名 +`label_prefix` | 导入标识前缀,最终生成的标识为 *{label\_prefix}\_{db}\_{table}\_{time_stamp}* + + +导入相关配置:([参考文档](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)) + +配置 | 说明 +--- | --- +`column_separator` | 列分割符,默认为\t。 +`columns` | 用于指定导入文件中的列和 table 中的列的对应关系。 +`where` | 导入任务指定的过滤条件。 +`max_filter_ratio` | 导入任务的最大容忍率,默认零容忍。 +`partition` | 待导入表的 Partition 信息。 +`timeout` | 超时时间,默认为600s。 +`strict_mode` | 严格模式,默认为false。 +`timezone` | 指定本次导入所使用的时区,默认为东八区。 +`exec_mem_limit` | 导入内存限制,默认为 2GB,单位为字节。 + +其他配置 + +配置 | 说明 +--- | --- +`save_on_failure` | 如果导入失败是否在本地保存,默认为true +`save_dir` | 本地保存目录,默认为 /tmp +`automatic_retries` | 失败时重试最大次数,默认为3 +`batch_size` | 每批次最多处理的event数量,默认为100000 +`idle_flush_time` | 最大间隔时间,默认为20(秒) + + +## 启动 +执行命令启动doris output plugin: + +`{logstash-home}/bin/logstash -f {logstash-home}/config/logstash-doris.conf --config.reload.automatic` + + + + +## 完整使用示例 +### 1.编译doris-output-plugin +1> 下载ruby压缩包,自行到[ruby官网](https://www.ruby-lang.org/en/downloads/)下载,这里使用的2.7.1版本 + +2> 编译安装,配置ruby的环境变量 + +3> 到doris源码 extension/logstash/ 目录下,执行 + +`gem build logstash-output-doris.gemspec` + +得到文件 logstash-output-doris-0.1.0.gem,至此编译完成 + +### 2.安装配置filebeat(此处使用filebeat作为input) + +1> [es官网](https://www.elastic.co/)下载 filebeat tar压缩包并解压 + +2> 进入filebeat目录下,修改配置文件 filebeat.yml 如下: + + filebeat.inputs: + - type: log + paths: + - /tmp/doris.data + output.logstash: + hosts: ["localhost:5044"] + +/tmp/doris.data 为doris数据路径 + +3> 启动filebeat: + +`./filebeat -e -c filebeat.yml -d "publish"` + + +### 3.安装logstash及doris-out-plugin +1> [es官网](https://www.elastic.co/)下载 logstash tar压缩包并解压 + +2> 将步骤1中得到的 logstash-output-doris-0.1.0.gem copy到logstash安装目录下 + +3> 执行 + +`./bin/logstash-plugin install logstash-output-doris-0.1.0.gem` + +安装插件 + +4> 在config 目录下新建配置文件 logstash-doris.conf 内容如下: + + input { + beats { + port => "5044" + } + } + + output { + doris { + http_hosts => [ "http://127.0.0.1:8030" ] + user => doris + password => doris + db => "logstash_output_test" + table => "output" + label_prefix => "doris" + column_separator => "," + columns => "a,b,c,d,e" + } + } + +这里的配置需按照配置说明自行配置 + +5> 启动logstash: + +./bin/logstash -f ./config/logstash-doris.conf --config.reload.automatic + +### 4.测试功能 + +向/tmp/doris.data追加写入数据 + +`echo a,b,c,d,e >> /tmp/doris.data` + +观察logstash日志,若返回response的Status为 Success,则导入成功,此时可在 logstash_output_test.output 表中查看已导入的数据 + diff --git a/extension/logstash/Gemfile b/extension/logstash/Gemfile new file mode 100644 index 0000000..d2d262b --- /dev/null +++ b/extension/logstash/Gemfile @@ -0,0 +1,20 @@ +=begin +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +=end +source 'https://rubygems.org' +gemspec \ No newline at end of file diff --git a/extension/logstash/LICENSE b/extension/logstash/LICENSE new file mode 100644 index 0000000..90705e0 --- /dev/null +++ b/extension/logstash/LICENSE @@ -0,0 +1,16 @@ +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. \ No newline at end of file diff --git a/extension/logstash/README.md b/extension/logstash/README.md new file mode 100644 index 0000000..57e594d --- /dev/null +++ b/extension/logstash/README.md @@ -0,0 +1,28 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +1. How to build + + `gem build logstash-output-doris.gemspec` + +2. How to use + + `http://doris.incubator.apache.org/master/en/extending-doris/logstash.html` + `http://doris.incubator.apache.org/master/zh-CN/extending-doris/logstash.html` + diff --git a/extension/logstash/Rakefile b/extension/logstash/Rakefile new file mode 100644 index 0000000..27ac367 --- /dev/null +++ b/extension/logstash/Rakefile @@ -0,0 +1,25 @@ +=begin +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +=end +@files=[] + +task :default do + system("rake -T") +end + +require "logstash/devutils/rake" diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb new file mode 100644 index 0000000..15d1b4f --- /dev/null +++ b/extension/logstash/lib/logstash/outputs/doris.rb @@ -0,0 +1,294 @@ +=begin +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +=end + +# encoding: utf-8 +require "logstash/outputs/base" +require "logstash/namespace" +require "logstash/json" +require "logstash/util/shortname_resolver" +require "uri" +require "stud/buffer" +require "logstash/plugin_mixins/http_client" +require "securerandom" +require "json" +require "base64" +require "restclient" + + +class LogStash::Outputs::Doris < LogStash::Outputs::Base + include LogStash::PluginMixins::HttpClient + include Stud::Buffer + + concurrency :single + + config_name "doris" + # hosts array of Doris Frontends. eg ["http://fe1:8030", "http://fe2:8030"] + config :http_hosts, :validate => :array, :required => true + # the database which data is loaded to + config :db, :validate => :string, :required => true + # the table which data is loaded to + config :table, :validate => :string, :required => true + # label prefix of a stream load requst. + config :label_prefix, :validate => :string, :required => true + # user + config :user, :validate => :string, :required => true + # password + config :password, :validate => :password, :required => true + # column separator + config :column_separator, :validate => :string, :default => "" + # column mappings. eg: "k1, k2, tmpk3, k3 = tmpk3 + 1" + config :columns, :validate => :string, :default => "" + # where predicate to filter data. eg: "k1 > 1 and k3 < 100" + config :where, :validate => :string, :default => "" + # max filter ratio + config :max_filter_ratio, :validate => :number, :default => -1 + # partition which data is loaded to. eg: "p1, p2" + config :partition, :validate => :array, :default => {} + # timeout of a stream load, in second + config :timeout, :validate => :number, :default => -1 + # switch off or on of strict mode + config :strict_mode, :validate => :string, :default => "false" + # timezone + config :timezone, :validate => :string, :default => "" + # memory limit of a stream load + config :exec_mem_limit, :validate => :number, :default => -1 + + # Custom headers to use + # format is `headers => ["X-My-Header", "%{host}"]` + config :headers, :validate => :hash + + config :batch_size, :validate => :number, :default => 100000 + + config :idle_flush_time, :validate => :number, :default => 20 + + config :save_on_failure, :validate => :boolean, :default => true + + config :save_dir, :validate => :string, :default => "/tmp" + + config :save_file, :validate => :string, :default => "failed.data" + + config :host_resolve_ttl_sec, :validate => :number, :default => 120 + + config :automatic_retries, :validate => :number, :default => 3 + + + def print_plugin_info() + @@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-doris/ } + @plugin_name = @@plugins[0].name + @plugin_version = @@plugins[0].version + @logger.debug("Running #{@plugin_name} version #{@plugin_version}") + + @logger.info("Initialized doris output with settings", + :db => @db, + :table => @table, + :label_prefix => @label_prefix, + :batch_size => @batch_size, + :idle_flush_time => @idle_flush_time, + :http_hosts => @http_hosts) + end + + def register + # Handle this deprecated option. TODO: remove the option + #@ssl_certificate_validation = @verify_ssl if @verify_ssl + + # We count outstanding requests with this queue + # This queue tracks the requests to create backpressure + # When this queue is empty no new requests may be sent, + # tokens must be added back by the client on success + #@request_tokens = SizedQueue.new(@pool_max) + #@pool_max.times {|t| @request_tokens << true } + #@requests = Array.new + + @http_query = "/api/#{db}/#{table}/_stream_load" + + @hostnames_pool = + parse_http_hosts(http_hosts, + ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger)) + + @request_headers = make_request_headers + @logger.info("request headers: ", @request_headers) + + buffer_initialize( + :max_items => @batch_size, + :max_interval => @idle_flush_time, + :logger => @logger + ) + + print_plugin_info() + end # def register + + private + + def parse_http_hosts(hosts, resolver) + ip_re = /^[\d]+\.[\d]+\.[\d]+\.[\d]+$/ + + lambda { + hosts.flat_map { |h| + scheme = URI(h).scheme + host = URI(h).host + port = URI(h).port + path = URI(h).path + + if ip_re !~ host + resolver.get_addresses(host).map { |ip| + "#{scheme}://#{ip}:#{port}#{path}" + } + else + [h] + end + } + } + end + + private + + def get_host_addresses() + begin + @hostnames_pool.call + rescue Exception => ex + @logger.error('Error while resolving host', :error => ex.to_s) + end + end + + # This module currently does not support parallel requests as that would circumvent the batching + def receive(event) + buffer_receive(event) + end + + public + def flush(events, close=false) + documents = "" + event_num = 0 + events.each do |event| + documents << event.get("[message]") << "\n" + event_num += 1 + end + + @logger.info("get event num: #{event_num}") + @logger.debug("get documents: #{documents}") + + hosts = get_host_addresses() + + @request_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_" + Time.now.strftime('%Y%m%d%H%M%S_%L') + make_request(documents, hosts, @http_query, 1, hosts.sample) + end + + private + + def save_to_disk(documents) + begin + file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a") + file.write(documents) + rescue IOError => e + log_failure("An error occurred while saving file to disk: #{e}", + :file_name => file_name) + ensure + file.close unless file.nil? + end + end + + + private + + def make_request(documents, hosts, query, req_count = 1,host = "", uuid = SecureRandom.hex) + + if host == "" + host = hosts.pop + end + + url = host+query + @logger.debug("req count: #{req_count}. get url: #{url}") + @logger.debug("request headers: ", @request_headers) + + + result = RestClient.put(url, documents,@request_headers) { |response, request, result| + case response.code + when 301, 302, 307 + @logger.debug("redirect to: #{response.headers[:location]}") + response.follow_redirection + else + response.return! + end + } + + @logger.info("response : \n #{result}" ) + result_body = JSON.parse(result.body) + if result_body['Status'] != "Success" + if req_count < @automatic_retries + @logger.warn("Response Status : #{result_body['Status']} . Retrying...... #{req_count}") + make_request(documents,hosts,query,req_count + 1,host,uuid) + return + end + @logger.warn("Load failed ! Try #{req_count} times.") + if @save_on_failure + @logger.warn("Retry times over #{req_count} times.Try save to disk.Disk file path : #{save_dir}/#{table}_#{save_file}") + save_to_disk(documents) + end + end + + end # def make_request + + # This is split into a separate method mostly to help testing + def log_failure(message, opts) + @logger.warn("[HTTP Output Failure] #{message}", opts) + end + + def make_request_headers() + headers = @headers || {} + headers["Expect"] ||= "100-continue" + headers["Content-Type"] ||= "text/plain;charset=utf-8" + headers["strict_mode"] ||= @strict_mode + headers["Authorization"] = "Basic " + Base64.strict_encode64("#{user}:#{password.value}") + # column_separator + if @column_separator != "" + headers["column_separator"] = @column_separator + end + # timezone + if @timezone != "" + headers["timezone"] = @timezone + end + # partition + if @partition.size > 0 + headers["partition"] ||= @partition + end + # where + if @where != "" + headers["where"] ||= @where + end + # timeout + if @timeout != -1 + headers["timeout"] ||= @timeout + end + # max_filter_ratio + if @max_filter_ratio != -1 + headers["max_filter_ratio"] ||= @max_filter_ratio + end + # exec_mem_limit + if @exec_mem_limit != -1 + headers["exec_mem_limit"] ||= @exec_mem_limit + end + # columns + if @columns != "" + headers["columns"] ||= @columns + end + headers + end +end # end of class LogStash::Outputs::Doris + + diff --git a/extension/logstash/lib/logstash/util/shortname_resolver.rb b/extension/logstash/lib/logstash/util/shortname_resolver.rb new file mode 100644 index 0000000..1437ccb --- /dev/null +++ b/extension/logstash/lib/logstash/util/shortname_resolver.rb @@ -0,0 +1,58 @@ +=begin +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +=end +require 'resolv' +require 'mini_cache' + +class ShortNameResolver + def initialize(ttl:, logger:) + @ttl = ttl + @store = MiniCache::Store.new + @logger = logger + end + + private + def resolve_cached(shortname) + @store.get_or_set(shortname) do + addresses = resolve(shortname) + raise "Bad shortname '#{shortname}'" if addresses.empty? + MiniCache::Data.new(addresses, expires_in: @ttl) + end + end + + private + def resolve(shortname) + addresses = Resolv::DNS.open do |dns| + dns.getaddresses(shortname).map { |r| r.to_s } + end + + @logger.info("Resolved shortname '#{shortname}' to addresses #{addresses}") + + return addresses + end + + public + def get_address(shortname) + return resolve_cached(shortname).sample + end + + public + def get_addresses(shortname) + return resolve_cached(shortname) + end +end diff --git a/extension/logstash/logstash-output-doris.gemspec b/extension/logstash/logstash-output-doris.gemspec new file mode 100644 index 0000000..91d28f0 --- /dev/null +++ b/extension/logstash/logstash-output-doris.gemspec @@ -0,0 +1,47 @@ +=begin +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +=end +Gem::Specification.new do |s| + s.name = 'logstash-output-doris' + s.version = '0.1.0' + s.author = 'wfjcmcb' + s.email = 'd...@doris.apache.org' + s.homepage = 'http://doris.apache.org' + s.licenses = ['Apache-2.0'] + s.summary = "This output lets you `PUT` messages in a batched fashion to Doris HTTP endpoint" + s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" + s.require_paths = ["lib"] + + # Files + s.files = Dir['lib/**/*','spec/**/*','*.gemspec','*.md','Gemfile','LICENSE' ] + + # Tests + s.test_files = s.files.grep(%r{^(test|spec|features)/}) + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } + + # Gem dependencies + s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" + s.add_runtime_dependency 'mini_cache', ">= 1.0.0", "< 2.0.0" + s.add_runtime_dependency "rest-client", '~> 2.1' + + s.add_development_dependency 'logstash-devutils', '~> 2.0', '>= 2.0.3' + s.add_development_dependency 'sinatra', '~> 2.0', '>= 2.0.8.1' + s.add_development_dependency 'webrick', '~> 1.6' +end --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org