This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push: new 624a648c57f [doc](routine load) optimize routine load doc (#1331) 624a648c57f is described below commit 624a648c57f70af755307eaaeddacfd7c96c00c5 Author: hui lai <1353307...@qq.com> AuthorDate: Wed Nov 13 22:05:24 2024 +0800 [doc](routine load) optimize routine load doc (#1331) --- .../import/import-way/routine-load-manual.md | 83 ++++++++------------ .../import/import-way/routine-load-manual.md | 57 +++++++------- .../import/import-way/routine-load-manual.md | 63 +++++++-------- .../import/import-way/routine-load-manual.md | 57 +++++++------- .../import/import-way/routine-load-manual.md | 87 +++++++++------------ .../import/import-way/routine-load-manual.md | 91 +++++++++------------- 6 files changed, 198 insertions(+), 240 deletions(-) diff --git a/docs/data-operate/import/import-way/routine-load-manual.md b/docs/data-operate/import/import-way/routine-load-manual.md index 9af12037e31..a51fdc9d6aa 100644 --- a/docs/data-operate/import/import-way/routine-load-manual.md +++ b/docs/data-operate/import/import-way/routine-load-manual.md @@ -30,9 +30,13 @@ Routine Load is a streaming load job that supports Exactly-Once semantics, ensur ## Usage Scenarios +### Supported Data Sources + +Routine Load supports consuming data from Kafka clusters. + ### Supported Data File Formats -Routine Load supports consuming data in CSV and JSON formats from Kafka. +Routine Load supports consuming data in CSV and JSON formats. When loading CSV format, it is necessary to clearly distinguish between null values and empty strings: @@ -44,35 +48,33 @@ When loading CSV format, it is necessary to clearly distinguish between null val When using Routine Load to consume data from Kafka, there are the following limitations: -- It supports unauthenticated Kafka access as well as Kafka clusters authenticated through SSL. - - The supported message formats are CSV and JSON text formats. Each message in CSV should be on a separate line, and the line should not end with a newline character. - By default, it supports Kafka versions 0.10.0.0 and above. If you need to use a Kafka version below 0.10.0.0 (such as 0.9.0, 0.8.2, 0.8.1, 0.8.0), you need to modify the BE configuration by setting the value of `kafka_broker_version_fallback` to the compatible older version, or directly set the value of `property.broker.version.fallback` when creating the Routine Load. However, using an older version may mean that some new features of Routine Load, such as setting the offset of Kafka p [...] ## Basic Principles -Routine Load continuously consumes data from Kafka Topic and writes it into Doris. +Routine Load continuously consumes data from Kafka Topics and writes it into Doris. -When a Routine Load job is created in Doris, it generates a persistent load job and several load tasks: +When a Routine Load job is created in Doris, it generates a resident import job that consists of several import tasks: -- Load Job: Each routine load corresponds to a load job. The load job is a persistent task that continuously consumes data from the Kafka Topic. +- Load Job: A Routine Load Job is a resident import job that continuously consumes data from the data source. -- Load Task: A load job is divided into several load tasks, which are loaded as independent basic units using the Stream Load method into BE. +- Load Task: An import job is broken down into several import tasks for actual consumption, with each task being an independent transaction. -The specific process of Routine Load is illustrated in the following diagram: +The specific import process of Routine Load is shown in the following diagram:  -1. The Client submits a Routine Load job to the FE to establish a persistent Routine Load Job. +1. The Client submits a request to create a Routine Load job to the FE, and the FE generates a resident import job (Routine Load Job) through the Routine Load Manager. -2. The FE splits the Routine Load Job into multiple Routine Load Tasks through the Job Scheduler. +2. The FE splits the Routine Load Job into several Routine Load Tasks through the Job Scheduler, which are then scheduled by the Task Scheduler and distributed to BE nodes. -3. On the BE, each Routine Load Task is treated as a Stream Load task for importation and reports back to the FE upon completion. +3. On the BE, after a Routine Load Task is completed, it submits the transaction to the FE and updates the Job's metadata. -4. The Job Scheduler in the FE generates new Tasks based on the report results or retries failed Tasks. +4. After a Routine Load Task is submitted, it continues to generate new Tasks or retries timed-out Tasks. -5. The Routine Load Job continuously generates new Tasks to complete uninterrupted data importation. +5. The newly generated Routine Load Tasks continue to be scheduled by the Task Scheduler in a continuous cycle. ## Quick Start @@ -433,33 +435,16 @@ Here are the available parameters for the job_properties clause: | max_error_number | The maximum number of error rows allowed within a sampling window. Must be greater than or equal to 0. The default value is 0, which means no error rows are allowed. The sampling window is `max_batch_rows * 10`. If the number of error rows within the sampling window exceeds `max_error_number`, the regular job will be paused and manual intervention is required to check for data quality issues using the [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/ [...] | strict_mode | Whether to enable strict mode. The default value is disabled. Strict mode applies strict filtering to type conversions during the load process. If enabled, non-null original data that results in a NULL after type conversion will be filtered out. The filtering rules in strict mode are as follows:<ul><li>Derived columns (generated by functions) are not affected by strict mode.</li><li>If a column's type needs to be converted, any data with an incorrect data [...] | timezone | Specifies the time zone used by the load job. The default is to use the session's timezone parameter. This parameter affects the results of all timezone-related functions involved in the load. | -| format | Specifies the data format for the load. The default is csv, and JSON format is supported. | +| format | Specifies the data format for the load. The default is CSV, and JSON format is supported. | | jsonpaths | When the data format is JSON, jsonpaths can be used to specify the JSON paths to extract data from nested structures. It is a JSON array of strings, where each string represents a JSON path. | -| delimiter | Specifies the delimiter used in CSV files. The default delimiter is a comma (,). | -| escape | Specifies the escape character used in CSV files. The default escape character is a backslash (\). | -| quote | Specifies the quote character used in CSV files. The default quote character is a double quotation mark ("). | -| null_format | Specifies the string representation of NULL values in the load data. The default is an empty string. | -| skip_header_lines | Specifies the number of lines to skip at the beginning of the load data file. The default is 0, which means no lines are skipped. | -| skip_footer_lines | Specifies the number of lines to skip at the end of the load data file. The default is 0, which means no lines are skipped. | -| query_parallelism | Specifies the number of parallel threads used by each subtask to execute SQL statements. The default is 1. | -| query_timeout | Specifies the timeout for SQL statement execution. The default is 3600 seconds (1 hour). | -| query_band | Specifies the query band string to be set for each subtask. | -| memory_quota_per_query | Specifies the memory quota for each subtask, in bytes. The default is -1, which means to use the system default. | -| error_table_name | Specifies the name of the error table where error rows are stored. The default is null, which means no error table is generated. | -| error_table_database | Specifies the database where the error table is located. The default is null, which means the error table is located in the current database. | -| error_table_schema | Specifies the schema where the error table is located. The default is null, which means the error table is located in the public schema. | -| error_table_logging_policy | Specifies the logging policy for the error table. The default is null, which means to use the system default. | -| error_table_reuse_policy | Specifies the reuse policy for the error table. The default is null, which means to use the system default. | -| error_table_creation_time | Specifies the creation time for the error table. The default is null, which means to use the current time. | -| error_table_cleanup_time | Specifies the cleanup time for the error table. The default is null, which means not set a cleanup time. | -| error_table_log | Specifies whether to enable logging for the error table. The default is null, which means to use the system default. | -| error_table_backup_time | Specifies the backup time for the error table. The default is null, which means not set a backup time. | -| error_table_backup_path | Specifies the backup path for the error table. The default is null, which means not set a backup path. | -| error_table_lifetime | Specifies the lifetime of the error table. The default is null, which means to use the system default. | -| error_table_backup_lifetime | Specifies the backup lifetime for the error table. The default is null, which means to use the system default. | -| error_table_label | Specifies the label for the error table. The default is null, which means not set a label. | -| error_table_priority | Specifies the priority for the error table. The default is null, which means to use the system default. | -| error_table_comment | Specifies the comment for the error table. The default is null, which means to not set a comment. | +| json_root | When importing JSON format data, you can specify the root node of the JSON data through json_root. Doris will extract and parse elements from the root node. Default is empty. For example, specify the JSON root node with: `"json_root" = "$.RECORDS"` | +| strip_outer_array | When importing JSON format data, if strip_outer_array is true, it indicates that the JSON data is presented as an array, and each element in the data will be treated as a row. Default value is false. Typically, JSON data in Kafka might be represented as an array with square brackets `[]` in the outermost layer. In this case, you can specify `"strip_outer_array" = "true"` to consume Topic data in array mode. For example, the following data will be parsed into [...] +| send_batch_parallelism | Used to set the parallelism of sending batch data. If the parallelism value exceeds the `max_send_batch_parallelism_per_job` in BE configuration, the coordinating BE will use the value of `max_send_batch_parallelism_per_job`. | +| load_to_single_tablet | Supports importing data to only one tablet in the corresponding partition per task. Default value is false. This parameter can only be set when importing data to OLAP tables with random bucketing. | +| partial_columns | Specifies whether to enable partial column update feature. Default value is false. This parameter can only be set when the table model is Unique and uses Merge on Write. Multi-table streaming does not support this parameter. For details, refer to [Partial Column Update](../../../data-operate/update/update-of-unique-model) | +| max_filter_ratio | The maximum allowed filter ratio within the sampling window. Must be between 0 and 1 inclusive. Default value is 1.0, indicating any error rows can be tolerated. The sampling window is `max_batch_rows * 10`. If the ratio of error rows to total rows within the sampling window exceeds `max_filter_ratio`, the routine job will be suspended and require manual intervention to check data quality issues. Rows filtered by WHERE conditions are not counted as error rows. | +| enclose | Specifies the enclosing character. When CSV data fields contain line or column separators, a single-byte character can be specified as an enclosing character for protection to prevent accidental truncation. For example, if the column separator is "," and the enclosing character is "'", the data "a,'b,c'" will have "b,c" parsed as one field. | +| escape | Specifies the escape character. Used to escape characters in fields that are identical to the enclosing character. For example, if the data is "a,'b,'c'", the enclosing character is "'", and you want "b,'c" to be parsed as one field, you need to specify a single-byte escape character, such as "\", and modify the data to "a,'b,\'c'". | These parameters can be used to customize the behavior of a Routine Load job according to your specific requirements. @@ -548,9 +533,7 @@ The columns in the result set provide the following information: ## Load example -### Loading CSV Format - -**Setting the Maximum Error Tolerance** +### Setting the Maximum Error Tolerance 1. Load sample data: @@ -604,7 +587,7 @@ The columns in the result set provide the following information: 2 rows in set (0.01 sec) ``` -**Consuming Data from a Specified Offset** +### Consuming Data from a Specified Offset 1. Load sample data: @@ -657,7 +640,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Specifying the Consumer Group's group.id and client.id** +### Specifying the Consumer Group's group.id and client.id 1. Load sample data: @@ -708,7 +691,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Setting load filtering conditions** +### Setting load filtering conditions 1. Load sample data: @@ -761,7 +744,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Loading specified partition data** +### Loading specified partition data 1. Load sample data: @@ -814,7 +797,7 @@ The columns in the result set provide the following information: 1 rows in set (0.01 sec) ``` -**Setting Time Zone for load** +### Setting Time Zone for load 1. Load sample data: @@ -868,6 +851,8 @@ The columns in the result set provide the following information: 3 rows in set (0.00 sec) ``` +### Setting merge_type + **Specify merge_type for delete operation** 1. Load sample data: @@ -1089,7 +1074,7 @@ The columns in the result set provide the following information: 5 rows in set (0.00 sec) ``` -**Load with column mapping and derived column calculation** +### Load with column mapping and derived column calculation 1. Load sample data: @@ -1140,7 +1125,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Load with enclosed data** +### Load with enclosed data 1. Load sample data: diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/import-way/routine-load-manual.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/import-way/routine-load-manual.md index c87a89d4eab..d998e9b995a 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/import-way/routine-load-manual.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/import-way/routine-load-manual.md @@ -30,9 +30,13 @@ Routine Load 是一个流式导入作业,支持 Exactly-Once 语义,保证 ## 使用场景 +### 支持数据源 + +Routine Load 支持从 Kafka 集群中消费数据。 + ### 支持数据文件格式 -Routine Load 支持从 Kafka 中消费 CSV 及 JSON 格式的数据。 +Routine Load 支持 CSV 及 JSON 格式的数据。 在导入 CSV 格式时,需要明确区分空值(null)与空字符串(''): @@ -44,8 +48,6 @@ Routine Load 支持从 Kafka 中消费 CSV 及 JSON 格式的数据。 在使用 Routine Load 消费 Kafka 中数据时,有以下限制: -- 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群; - - 支持的消息格式为 CSV 及 JSON 文本格式。CSV 每一个 message 为一行,且行尾**不包含**换行符; - 默认支持 Kafka 0.10.0.0(含)以上版本。如果要使用 Kafka 0.10.0.0 以下版本(0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改 BE 的配置,将 `kafka_broker_version_fallback` 的值设置为要兼容的旧版本,或者在创建 Routine Load 的时候直接设置 `property.broker.version.fallback` 的值为要兼容的旧版本,使用旧版本的代价是 Routine Load 的部分新特性可能无法使用,如根据时间设置 Kafka 分区的 offset。 @@ -54,25 +56,25 @@ Routine Load 支持从 Kafka 中消费 CSV 及 JSON 格式的数据。 Routine Load 会持续消费 Kafka Topic 中的数据,写入 Doris 中。 -在 Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业和若干个导入任务: +在 Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业,包括若干个导入任务: -- 导入作业(load job):一个 Routine Load 对应一个导入作业,导入作业是一个常驻的任务,会持续不断地消费 Kafka Topic 中的数据; +- 导入作业(load job):一个 Routine Load Job 是一个常驻的导入作业,会持续不断地消费数据源中的数据。 -- 导入任务(load task):一个导入作业会被拆解成若干个导入作业,作为一个独立的导入基本单位,以 Stream Load 的方式写入到 BE 中。 +- 导入任务(load task):一个导入作业会被拆解成若干个导入任务进行实际消费,每个任务都是一个独立的事务。 Routine Load 的导入具体流程如下图展示:  -1. Client 向 FE 提交 Routine Load 常驻 Routine Load Job +1. Client 向 FE 提交创建 Routine Load 作业请求,FE 通过 Routine Load Manager 生成一个常驻的导入作业(Routine Load Job)。 -2. FE 通过 Job Scheduler 将 Routine Load Job 拆分成若干个 Routine Load Task +2. FE 通过 Job Scheduler 将 Routine Load Job 拆分成若干个 Routine Load Task,由 Task Scheduler 进行调度,下发到 BE 节点。 -3. 在 BE 上,一个 Routine Load Task 会被视为 Stream Load 任务进行导入,导入完成后向 FE 汇报 +3. 在 BE 上,一个 Routine Load Task 导入完成后向 FE 提交事务,并更新 Job 的元数据。 -4. FE 中的 Job Scheduler 根据汇报结果,继续生成新的 Task,或对失败的 Task 进行重试 +4. 一个 Routine Load Task 提交后,会继续生成新的 Task,或对超时的 Task 进行重试。 -5. Routine Load Job 会不断产生新的 Task,来完成数据的不间断导入 +5. 新生成的 Routine Load Task 由 Task Scheduler 继续调度,不断循环。 ## 快速上手 @@ -389,7 +391,7 @@ FROM KAFKA [data_source_properties] 指定需要导入的表的名称,可选参数。 -如果不指定,则采用动态表的方式,这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。csv 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 +如果不指定,则采用动态表的方式,这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。CSV 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 **merge_type 子句** @@ -445,15 +447,15 @@ job_properties 子句具体参数选项如下: | max_error_number | 采样窗口内,允许的最大错误行数。必须大于等于 0。默认是 0,即不允许有错误行。采样窗口为 `max_batch_rows * 10`。即如果在采样窗口内,错误行数大于 `max_error_number`,则会导致例行作业被暂停,需要人工介入检查数据质量问题,通过 [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/Show-Statements/SHOW-ROUTINE-LOAD) 命令中 `ErrorLogUrls` 检查数据的质量问题。被 where 条件过滤掉的行不算错误行。 | | strict_mode | 是否开启严格模式,默认为关闭。严格模式表示对于导入过程中的列类型转换进行严格过滤。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。<p>严格模式过滤策略如下:</p> <p>- 某衍生列(由函数转换生成而来),Strict Mode 对其不产生影响</p> <p>- 当列类型需要转换,错误的数据类型将被过滤掉,在 [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/Show-Statements/SHOW-ROUTINE-LOAD) 的 `ErrorLogUrls` 中查看因为数据类型错误而被过滤掉的列</p> <p>- 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。详细内容参考[严格模 式](../../ [...] | timezone | 指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果。 | -| format | 指定导入数据格式,默认是 csv,支持 json 格式。 | -| jsonpaths | 当导入数据格式为 JSON 时,可以通过 jsonpaths 指定抽取 Json 数据中的字段。例如通过以下命令指定导入 jsonpaths:`"jsonpaths" = "[\"$.userid\",\"$.username\",\"$.age\",\"$.city\"]"` | -| json_root | 当导入数据格式为 json 时,可以通过 json_root 指定 Json 数据的根节点。Doris 将通过 json_root 抽取根节点的元素进行解析。默认为空。例如通过一下命令指定导入 Json 根节点:`"json_root" = "$.RECORDS"` | -| strip_outer_array | 当导入数据格式为 json 时,strip_outer_array 为 true 表示 Json 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 Json 数据可能以数组形式表示,即在最外层中包含中括号`[]`,此时,可以指定 `"strip_outer_array" = "true"`,以数组模式消费 Topic 中的数据。如以下数据会被解析成两行:`[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}]` | +| format | 指定导入数据格式,默认是 CSV,支持 JSON 格式。 | +| jsonpaths | 当导入数据格式为 JSON 时,可以通过 jsonpaths 指定抽取 JSON 数据中的字段。例如通过以下命令指定导入 jsonpaths:`"jsonpaths" = "[\"$.userid\",\"$.username\",\"$.age\",\"$.city\"]"` | +| json_root | 当导入数据格式为 JSON 时,可以通过 json_root 指定 JSON 数据的根节点。Doris 将通过 json_root 抽取根节点的元素进行解析。默认为空。例如通过一下命令指定导入 JSON 根节点:`"json_root" = "$.RECORDS"` | +| strip_outer_array | 当导入数据格式为 json 时,strip_outer_array 为 true 表示 JSON 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 JSON 数据可能以数组形式表示,即在最外层中包含中括号`[]`,此时,可以指定 `"strip_outer_array" = "true"`,以数组模式消费 Topic 中的数据。如以下数据会被解析成两行:`[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}]` | | send_batch_parallelism | 用于设置发送批量数据的并行度。如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 | | load_to_single_tablet | 支持一个任务只导入数据到对应分区的一个 tablet,默认值为 false,该参数只允许在对带有 random 分桶的 olap 表导数的时候设置。 | | partial_columns | 指定是否开启部分列更新功能。默认值为 false。该参数只允许在表模型为 Unique 且采用 Merge on Write 时设置。一流多表不支持此参数。具体参考文档[部分列更新](../../../data-operate/update/update-of-unique-model) | | max_filter_ratio | 采样窗口内,允许的最大过滤率。必须在大于等于 0 到小于等于 1 之间。默认值是 1.0,表示可以容忍任何错误行。采样窗口为 `max_batch_rows * 10`。即如果在采样窗口内,错误行数/总行数大于 `max_filter_ratio`,则会导致例行作业被暂停,需要人工介入检查数据质量问题。被 where 条件过滤掉的行不算错误行。 | -| enclose | 指定包围符。当 csv 数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为 ",",包围符为 "'",数据为 "a,'b,c'",则 "b,c" 会被解析为一个字段。 | +| enclose | 指定包围符。当 CSV 数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为 ",",包围符为 "'",数据为 "a,'b,c'",则 "b,c" 会被解析为一个字段。 | | escape | 指定转义符。用于转义在字段中出现的与包围符相同的字符。例如数据为 "a,'b,'c'",包围符为 "'",希望 "b,'c 被作为一个字段解析,则需要指定单字节转义符,例如"\",将数据修改为 "a,'b,\'c'"。 | **04 data_source_properties 子句** @@ -541,9 +543,7 @@ ReasonOfStateChanged: ## 导入示例 -### CSV 格式导入 - -**设置导入最大容错率** +### 设置导入最大容错率 1. 导入数据样例 @@ -597,7 +597,7 @@ ReasonOfStateChanged: 2 rows in set (0.01 sec) ``` -**从指定消费点消费数据** +### 从指定消费点消费数据 1. 导入数据样例 @@ -650,7 +650,7 @@ ReasonOfStateChanged: 3 rows in set (0.01 sec) ``` -**指定 Consumer Group 的 group.id 与 client.id** +### 指定 Consumer Group 的 group.id 与 client.id 1. 导入数据样例 @@ -701,7 +701,7 @@ ReasonOfStateChanged: 3 rows in set (0.01 sec) ``` -**设置导入过滤条件** +### 设置导入过滤条件 1. 导入数据样例 @@ -754,7 +754,7 @@ ReasonOfStateChanged: 3 rows in set (0.01 sec) ``` -**导入指定分区数据** +### 导入指定分区数据 1. 导入数据样例 @@ -807,7 +807,7 @@ ReasonOfStateChanged: 1 rows in set (0.01 sec) ``` -**设置导入时区** +### 设置导入时区 1. 导入数据样例 @@ -860,6 +860,7 @@ ReasonOfStateChanged: +------+-------------+------+---------------------+ 3 rows in set (0.00 sec) ``` +### 设置 merge_type **指定 merge_type 进行 delete 操作** @@ -1082,7 +1083,7 @@ mysql> SELECT * FROM routine_test08; 5 rows in set (0.00 sec) ``` -**导入完成列影射与衍生列计算** +### 导入完成列影射与衍生列计算 1. 导入数据样例 @@ -1133,7 +1134,7 @@ mysql> SELECT * FROM routine_test08; 3 rows in set (0.01 sec) ``` -**导入包含包围符的数据** +### 导入包含包围符的数据 1. 导入数据样例 @@ -1828,7 +1829,7 @@ FROM KAFKA ); ``` -这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。csv 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 +这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 JSON 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。CSV 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 ### 严格模式导入 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/import-way/routine-load-manual.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/import-way/routine-load-manual.md index dd8df4303c4..d998e9b995a 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/import-way/routine-load-manual.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/import-way/routine-load-manual.md @@ -30,9 +30,13 @@ Routine Load 是一个流式导入作业,支持 Exactly-Once 语义,保证 ## 使用场景 +### 支持数据源 + +Routine Load 支持从 Kafka 集群中消费数据。 + ### 支持数据文件格式 -Routine Load 支持从 Kafka 中消费 CSV 及 JSON 格式的数据。 +Routine Load 支持 CSV 及 JSON 格式的数据。 在导入 CSV 格式时,需要明确区分空值(null)与空字符串(''): @@ -44,8 +48,6 @@ Routine Load 支持从 Kafka 中消费 CSV 及 JSON 格式的数据。 在使用 Routine Load 消费 Kafka 中数据时,有以下限制: -- 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群; - - 支持的消息格式为 CSV 及 JSON 文本格式。CSV 每一个 message 为一行,且行尾**不包含**换行符; - 默认支持 Kafka 0.10.0.0(含)以上版本。如果要使用 Kafka 0.10.0.0 以下版本(0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改 BE 的配置,将 `kafka_broker_version_fallback` 的值设置为要兼容的旧版本,或者在创建 Routine Load 的时候直接设置 `property.broker.version.fallback` 的值为要兼容的旧版本,使用旧版本的代价是 Routine Load 的部分新特性可能无法使用,如根据时间设置 Kafka 分区的 offset。 @@ -54,25 +56,25 @@ Routine Load 支持从 Kafka 中消费 CSV 及 JSON 格式的数据。 Routine Load 会持续消费 Kafka Topic 中的数据,写入 Doris 中。 -在 Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业和若干个导入任务: +在 Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业,包括若干个导入任务: -- 导入作业(load job):一个 Routine Load 对应一个导入作业,导入作业是一个常驻的任务,会持续不断地消费 Kafka Topic 中的数据; +- 导入作业(load job):一个 Routine Load Job 是一个常驻的导入作业,会持续不断地消费数据源中的数据。 -- 导入任务(load task):一个导入作业会被拆解成若干个导入作业,作为一个独立的导入基本单位,以 Stream Load 的方式写入到 BE 中。 +- 导入任务(load task):一个导入作业会被拆解成若干个导入任务进行实际消费,每个任务都是一个独立的事务。 Routine Load 的导入具体流程如下图展示:  -1. Client 向 FE 提交 Routine Load 常驻 Routine Load Job +1. Client 向 FE 提交创建 Routine Load 作业请求,FE 通过 Routine Load Manager 生成一个常驻的导入作业(Routine Load Job)。 -2. FE 通过 Job Scheduler 将 Routine Load Job 拆分成若干个 Routine Load Task +2. FE 通过 Job Scheduler 将 Routine Load Job 拆分成若干个 Routine Load Task,由 Task Scheduler 进行调度,下发到 BE 节点。 -3. 在 BE 上,一个 Routine Load Task 会被视为 Stream Load 任务进行导入,导入完成后向 FE 汇报 +3. 在 BE 上,一个 Routine Load Task 导入完成后向 FE 提交事务,并更新 Job 的元数据。 -4. FE 中的 Job Scheduler 根据汇报结果,继续生成新的 Task,或对失败的 Task 进行重试 +4. 一个 Routine Load Task 提交后,会继续生成新的 Task,或对超时的 Task 进行重试。 -5. Routine Load Job 会不断产生新的 Task,来完成数据的不间断导入 +5. 新生成的 Routine Load Task 由 Task Scheduler 继续调度,不断循环。 ## 快速上手 @@ -389,7 +391,7 @@ FROM KAFKA [data_source_properties] 指定需要导入的表的名称,可选参数。 -如果不指定,则采用动态表的方式,这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。csv 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 +如果不指定,则采用动态表的方式,这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。CSV 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 **merge_type 子句** @@ -440,20 +442,20 @@ job_properties 子句具体参数选项如下: | ------------------------- | ------------------------------------------------------------ | | desired_concurrent_number | <p>默认值:256 </p> <p>参数描述:单个导入子任务(load task)期望的并发度,修改 Routine Load 导入作业切分的期望导入子任务数量。在导入过程中,期望的子任务并发度可能不等于实际并发度。实际的并发度会根据集群的节点数、负载情况,以及数据源的情况综合考虑,使用公式以下可以计算出实际的导入子任务数:</p> <p>` min(topic_partition_num, desired_concurrent_number, max_routine_load_task_concurrent_num)`,其中:</p> <p>- topic_partition_num 表示 Kafka Topic 的 parititon 数量</p> <p>- desired_concurrent_number 表示设置的参数大小</p> <p>- max_routine_load_task_concurrent_num 为 FE 中设置 Routine Load 最大任务并行度的参数</p> | | max_batch_interval | 每个子任务的最大运行时间,单位是秒,必须大于0,默认值为 60(s)。max_batch_interval/max_batch_rows/max_batch_size 共同形成子任务执行阈值。任一参数达到阈值,导入子任务结束,并生成新的导入子任务。 | -| max_batch_rows | 每个子任务最多读取的行数。必须大于等于 200000。默认是 200000(2.1.5 及更高版本为 20000000)。max_batch_interval/max_batch_rows/max_batch_size 共同形成子任务执行阈值。任一参数达到阈值,导入子任务结束,并生成新的导入子任务。 | -| max_batch_size | 每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是 100MB(2.1.5 及更高版本为 1G)。max_batch_interval/max_batch_rows/max_batch_size 共同形成子任务执行阈值。任一参数达到阈值,导入子任务结束,并生成新的导入子任务。 | +| max_batch_rows | 每个子任务最多读取的行数。必须大于等于 200000。默认是 20000000。max_batch_interval/max_batch_rows/max_batch_size 共同形成子任务执行阈值。任一参数达到阈值,导入子任务结束,并生成新的导入子任务。 | +| max_batch_size | 每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是 1G。max_batch_interval/max_batch_rows/max_batch_size 共同形成子任务执行阈值。任一参数达到阈值,导入子任务结束,并生成新的导入子任务。 | | max_error_number | 采样窗口内,允许的最大错误行数。必须大于等于 0。默认是 0,即不允许有错误行。采样窗口为 `max_batch_rows * 10`。即如果在采样窗口内,错误行数大于 `max_error_number`,则会导致例行作业被暂停,需要人工介入检查数据质量问题,通过 [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/Show-Statements/SHOW-ROUTINE-LOAD) 命令中 `ErrorLogUrls` 检查数据的质量问题。被 where 条件过滤掉的行不算错误行。 | -| strict_mode | 是否开启严格模式,默认为关闭。严格模式表示对于导入过程中的列类型转换进行严格过滤。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。<p>严格模式过滤策略如下:</p> <p>- 某衍生列(由函数转换生成而来),Strict Mode 对其不产生影响</p> <p>- 当列类型需要转换,错误的数据类型将被过滤掉,在 [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/Show-Statements/SHOW-ROUTINE-LOAD) 的 `ErrorLogUrls` 中查看因为数据类型错误而被过滤掉的列</p> <p>- 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。详细内容参考[严格模 式](../../ [...] +| strict_mode | 是否开启严格模式,默认为关闭。严格模式表示对于导入过程中的列类型转换进行严格过滤。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。<p>严格模式过滤策略如下:</p> <p>- 某衍生列(由函数转换生成而来),Strict Mode 对其不产生影响</p> <p>- 当列类型需要转换,错误的数据类型将被过滤掉,在 [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/Show-Statements/SHOW-ROUTINE-LOAD) 的 `ErrorLogUrls` 中查看因为数据类型错误而被过滤掉的列</p> <p>- 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。详细内容参考[严格模 式](../../ [...] | timezone | 指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果。 | -| format | 指定导入数据格式,默认是 csv,支持 json 格式。 | -| jsonpaths | 当导入数据格式为 JSON 时,可以通过 jsonpaths 指定抽取 Json 数据中的字段。例如通过以下命令指定导入 jsonpaths:`"jsonpaths" = "[\"$.userid\",\"$.username\",\"$.age\",\"$.city\"]"` | -| json_root | 当导入数据格式为 json 时,可以通过 json_root 指定 Json 数据的根节点。Doris 将通过 json_root 抽取根节点的元素进行解析。默认为空。例如通过一下命令指定导入 Json 根节点:`"json_root" = "$.RECORDS"` | -| strip_outer_array | 当导入数据格式为 json 时,strip_outer_array 为 true 表示 Json 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 Json 数据可能以数组形式表示,即在最外层中包含中括号`[]`,此时,可以指定 `"strip_outer_array" = "true"`,以数组模式消费 Topic 中的数据。如以下数据会被解析成两行:`[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}]` | +| format | 指定导入数据格式,默认是 CSV,支持 JSON 格式。 | +| jsonpaths | 当导入数据格式为 JSON 时,可以通过 jsonpaths 指定抽取 JSON 数据中的字段。例如通过以下命令指定导入 jsonpaths:`"jsonpaths" = "[\"$.userid\",\"$.username\",\"$.age\",\"$.city\"]"` | +| json_root | 当导入数据格式为 JSON 时,可以通过 json_root 指定 JSON 数据的根节点。Doris 将通过 json_root 抽取根节点的元素进行解析。默认为空。例如通过一下命令指定导入 JSON 根节点:`"json_root" = "$.RECORDS"` | +| strip_outer_array | 当导入数据格式为 json 时,strip_outer_array 为 true 表示 JSON 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 JSON 数据可能以数组形式表示,即在最外层中包含中括号`[]`,此时,可以指定 `"strip_outer_array" = "true"`,以数组模式消费 Topic 中的数据。如以下数据会被解析成两行:`[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}]` | | send_batch_parallelism | 用于设置发送批量数据的并行度。如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 | | load_to_single_tablet | 支持一个任务只导入数据到对应分区的一个 tablet,默认值为 false,该参数只允许在对带有 random 分桶的 olap 表导数的时候设置。 | | partial_columns | 指定是否开启部分列更新功能。默认值为 false。该参数只允许在表模型为 Unique 且采用 Merge on Write 时设置。一流多表不支持此参数。具体参考文档[部分列更新](../../../data-operate/update/update-of-unique-model) | | max_filter_ratio | 采样窗口内,允许的最大过滤率。必须在大于等于 0 到小于等于 1 之间。默认值是 1.0,表示可以容忍任何错误行。采样窗口为 `max_batch_rows * 10`。即如果在采样窗口内,错误行数/总行数大于 `max_filter_ratio`,则会导致例行作业被暂停,需要人工介入检查数据质量问题。被 where 条件过滤掉的行不算错误行。 | -| enclose | 指定包围符。当 csv 数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为 ",",包围符为 "'",数据为 "a,'b,c'",则 "b,c" 会被解析为一个字段。 | +| enclose | 指定包围符。当 CSV 数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为 ",",包围符为 "'",数据为 "a,'b,c'",则 "b,c" 会被解析为一个字段。 | | escape | 指定转义符。用于转义在字段中出现的与包围符相同的字符。例如数据为 "a,'b,'c'",包围符为 "'",希望 "b,'c 被作为一个字段解析,则需要指定单字节转义符,例如"\",将数据修改为 "a,'b,\'c'"。 | **04 data_source_properties 子句** @@ -541,9 +543,7 @@ ReasonOfStateChanged: ## 导入示例 -### CSV 格式导入 - -**设置导入最大容错率** +### 设置导入最大容错率 1. 导入数据样例 @@ -597,7 +597,7 @@ ReasonOfStateChanged: 2 rows in set (0.01 sec) ``` -**从指定消费点消费数据** +### 从指定消费点消费数据 1. 导入数据样例 @@ -650,7 +650,7 @@ ReasonOfStateChanged: 3 rows in set (0.01 sec) ``` -**指定 Consumer Group 的 group.id 与 client.id** +### 指定 Consumer Group 的 group.id 与 client.id 1. 导入数据样例 @@ -701,7 +701,7 @@ ReasonOfStateChanged: 3 rows in set (0.01 sec) ``` -**设置导入过滤条件** +### 设置导入过滤条件 1. 导入数据样例 @@ -754,7 +754,7 @@ ReasonOfStateChanged: 3 rows in set (0.01 sec) ``` -**导入指定分区数据** +### 导入指定分区数据 1. 导入数据样例 @@ -807,7 +807,7 @@ ReasonOfStateChanged: 1 rows in set (0.01 sec) ``` -**设置导入时区** +### 设置导入时区 1. 导入数据样例 @@ -860,6 +860,7 @@ ReasonOfStateChanged: +------+-------------+------+---------------------+ 3 rows in set (0.00 sec) ``` +### 设置 merge_type **指定 merge_type 进行 delete 操作** @@ -1082,7 +1083,7 @@ mysql> SELECT * FROM routine_test08; 5 rows in set (0.00 sec) ``` -**导入完成列影射与衍生列计算** +### 导入完成列影射与衍生列计算 1. 导入数据样例 @@ -1133,7 +1134,7 @@ mysql> SELECT * FROM routine_test08; 3 rows in set (0.01 sec) ``` -**导入包含包围符的数据** +### 导入包含包围符的数据 1. 导入数据样例 @@ -1828,7 +1829,7 @@ FROM KAFKA ); ``` -这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。csv 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 +这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 JSON 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。CSV 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 ### 严格模式导入 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/import-way/routine-load-manual.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/import-way/routine-load-manual.md index 406404548ad..7626106b7eb 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/import-way/routine-load-manual.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/import-way/routine-load-manual.md @@ -30,9 +30,13 @@ Routine Load 是一个流式导入作业,支持 Exactly-Once 语义,保证 ## 使用场景 +### 支持数据源 + +Routine Load 支持从 Kafka 集群中消费数据。 + ### 支持数据文件格式 -Routine Load 支持从 Kafka 中消费 CSV 及 JSON 格式的数据。 +Routine Load 支持 CSV 及 JSON 格式的数据。 在导入 CSV 格式时,需要明确区分空值(null)与空字符串(''): @@ -44,8 +48,6 @@ Routine Load 支持从 Kafka 中消费 CSV 及 JSON 格式的数据。 在使用 Routine Load 消费 Kafka 中数据时,有以下限制: -- 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群; - - 支持的消息格式为 CSV 及 JSON 文本格式。CSV 每一个 message 为一行,且行尾**不包含**换行符; - 默认支持 Kafka 0.10.0.0(含)以上版本。如果要使用 Kafka 0.10.0.0 以下版本(0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改 BE 的配置,将 `kafka_broker_version_fallback` 的值设置为要兼容的旧版本,或者在创建 Routine Load 的时候直接设置 `property.broker.version.fallback` 的值为要兼容的旧版本,使用旧版本的代价是 Routine Load 的部分新特性可能无法使用,如根据时间设置 Kafka 分区的 offset。 @@ -54,25 +56,25 @@ Routine Load 支持从 Kafka 中消费 CSV 及 JSON 格式的数据。 Routine Load 会持续消费 Kafka Topic 中的数据,写入 Doris 中。 -在 Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业和若干个导入任务: +在 Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业,包括若干个导入任务: -- 导入作业(load job):一个 Routine Load 对应一个导入作业,导入作业是一个常驻的任务,会持续不断地消费 Kafka Topic 中的数据; +- 导入作业(load job):一个 Routine Load Job 是一个常驻的导入作业,会持续不断地消费数据源中的数据。 -- 导入任务(load task):一个导入作业会被拆解成若干个导入作业,作为一个独立的导入基本单位,以 Stream Load 的方式写入到 BE 中。 +- 导入任务(load task):一个导入作业会被拆解成若干个导入任务进行实际消费,每个任务都是一个独立的事务。 Routine Load 的导入具体流程如下图展示:  -1. Client 向 FE 提交 Routine Load 常驻 Routine Load Job +1. Client 向 FE 提交创建 Routine Load 作业请求,FE 通过 Routine Load Manager 生成一个常驻的导入作业(Routine Load Job)。 -2. FE 通过 Job Scheduler 将 Routine Load Job 拆分成若干个 Routine Load Task +2. FE 通过 Job Scheduler 将 Routine Load Job 拆分成若干个 Routine Load Task,由 Task Scheduler 进行调度,下发到 BE 节点。 -3. 在 BE 上,一个 Routine Load Task 会被视为 Stream Load 任务进行导入,导入完成后向 FE 汇报 +3. 在 BE 上,一个 Routine Load Task 导入完成后向 FE 提交事务,并更新 Job 的元数据。 -4. FE 中的 Job Scheduler 根据汇报结果,继续生成新的 Task,或对失败的 Task 进行重试 +4. 一个 Routine Load Task 提交后,会继续生成新的 Task,或对超时的 Task 进行重试。 -5. Routine Load Job 会不断产生新的 Task,来完成数据的不间断导入 +5. 新生成的 Routine Load Task 由 Task Scheduler 继续调度,不断循环。 ## 快速上手 @@ -389,7 +391,7 @@ FROM KAFKA [data_source_properties] 指定需要导入的表的名称,可选参数。 -如果不指定,则采用动态表的方式,这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。csv 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 +如果不指定,则采用动态表的方式,这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。CSV 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 **merge_type 子句** @@ -445,15 +447,15 @@ job_properties 子句具体参数选项如下: | max_error_number | 采样窗口内,允许的最大错误行数。必须大于等于 0。默认是 0,即不允许有错误行。采样窗口为 `max_batch_rows * 10`。即如果在采样窗口内,错误行数大于 `max_error_number`,则会导致例行作业被暂停,需要人工介入检查数据质量问题,通过 [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/Show-Statements/SHOW-ROUTINE-LOAD) 命令中 `ErrorLogUrls` 检查数据的质量问题。被 where 条件过滤掉的行不算错误行。 | | strict_mode | 是否开启严格模式,默认为关闭。严格模式表示对于导入过程中的列类型转换进行严格过滤。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。<p>严格模式过滤策略如下:</p> <p>- 某衍生列(由函数转换生成而来),Strict Mode 对其不产生影响</p> <p>- 当列类型需要转换,错误的数据类型将被过滤掉,在 [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/Show-Statements/SHOW-ROUTINE-LOAD) 的 `ErrorLogUrls` 中查看因为数据类型错误而被过滤掉的列</p> <p>- 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。详细内容参考[严格模 式](../../ [...] | timezone | 指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果。 | -| format | 指定导入数据格式,默认是 csv,支持 json 格式。 | -| jsonpaths | 当导入数据格式为 JSON 时,可以通过 jsonpaths 指定抽取 Json 数据中的字段。例如通过以下命令指定导入 jsonpaths:`"jsonpaths" = "[\"$.userid\",\"$.username\",\"$.age\",\"$.city\"]"` | -| json_root | 当导入数据格式为 json 时,可以通过 json_root 指定 Json 数据的根节点。Doris 将通过 json_root 抽取根节点的元素进行解析。默认为空。例如通过一下命令指定导入 Json 根节点:`"json_root" = "$.RECORDS"` | -| strip_outer_array | 当导入数据格式为 json 时,strip_outer_array 为 true 表示 Json 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 Json 数据可能以数组形式表示,即在最外层中包含中括号`[]`,此时,可以指定 `"strip_outer_array" = "true"`,以数组模式消费 Topic 中的数据。如以下数据会被解析成两行:`[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}]` | +| format | 指定导入数据格式,默认是 CSV,支持 JSON 格式。 | +| jsonpaths | 当导入数据格式为 JSON 时,可以通过 jsonpaths 指定抽取 JSON 数据中的字段。例如通过以下命令指定导入 jsonpaths:`"jsonpaths" = "[\"$.userid\",\"$.username\",\"$.age\",\"$.city\"]"` | +| json_root | 当导入数据格式为 JSON 时,可以通过 json_root 指定 JSON 数据的根节点。Doris 将通过 json_root 抽取根节点的元素进行解析。默认为空。例如通过一下命令指定导入 JSON 根节点:`"json_root" = "$.RECORDS"` | +| strip_outer_array | 当导入数据格式为 json 时,strip_outer_array 为 true 表示 JSON 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 JSON 数据可能以数组形式表示,即在最外层中包含中括号`[]`,此时,可以指定 `"strip_outer_array" = "true"`,以数组模式消费 Topic 中的数据。如以下数据会被解析成两行:`[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}]` | | send_batch_parallelism | 用于设置发送批量数据的并行度。如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 | | load_to_single_tablet | 支持一个任务只导入数据到对应分区的一个 tablet,默认值为 false,该参数只允许在对带有 random 分桶的 olap 表导数的时候设置。 | | partial_columns | 指定是否开启部分列更新功能。默认值为 false。该参数只允许在表模型为 Unique 且采用 Merge on Write 时设置。一流多表不支持此参数。具体参考文档[部分列更新](../../../data-operate/update/update-of-unique-model) | | max_filter_ratio | 采样窗口内,允许的最大过滤率。必须在大于等于 0 到小于等于 1 之间。默认值是 1.0,表示可以容忍任何错误行。采样窗口为 `max_batch_rows * 10`。即如果在采样窗口内,错误行数/总行数大于 `max_filter_ratio`,则会导致例行作业被暂停,需要人工介入检查数据质量问题。被 where 条件过滤掉的行不算错误行。 | -| enclose | 指定包围符。当 csv 数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为 ",",包围符为 "'",数据为 "a,'b,c'",则 "b,c" 会被解析为一个字段。 | +| enclose | 指定包围符。当 CSV 数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为 ",",包围符为 "'",数据为 "a,'b,c'",则 "b,c" 会被解析为一个字段。 | | escape | 指定转义符。用于转义在字段中出现的与包围符相同的字符。例如数据为 "a,'b,'c'",包围符为 "'",希望 "b,'c 被作为一个字段解析,则需要指定单字节转义符,例如"\",将数据修改为 "a,'b,\'c'"。 | **04 data_source_properties 子句** @@ -541,9 +543,7 @@ ReasonOfStateChanged: ## 导入示例 -### CSV 格式导入 - -**设置导入最大容错率** +### 设置导入最大容错率 1. 导入数据样例 @@ -597,7 +597,7 @@ ReasonOfStateChanged: 2 rows in set (0.01 sec) ``` -**从指定消费点消费数据** +### 从指定消费点消费数据 1. 导入数据样例 @@ -650,7 +650,7 @@ ReasonOfStateChanged: 3 rows in set (0.01 sec) ``` -**指定 Consumer Group 的 group.id 与 client.id** +### 指定 Consumer Group 的 group.id 与 client.id 1. 导入数据样例 @@ -701,7 +701,7 @@ ReasonOfStateChanged: 3 rows in set (0.01 sec) ``` -**设置导入过滤条件** +### 设置导入过滤条件 1. 导入数据样例 @@ -754,7 +754,7 @@ ReasonOfStateChanged: 3 rows in set (0.01 sec) ``` -**导入指定分区数据** +### 导入指定分区数据 1. 导入数据样例 @@ -807,7 +807,7 @@ ReasonOfStateChanged: 1 rows in set (0.01 sec) ``` -**设置导入时区** +### 设置导入时区 1. 导入数据样例 @@ -860,6 +860,7 @@ ReasonOfStateChanged: +------+-------------+------+---------------------+ 3 rows in set (0.00 sec) ``` +### 设置 merge_type **指定 merge_type 进行 delete 操作** @@ -1082,7 +1083,7 @@ mysql> SELECT * FROM routine_test08; 5 rows in set (0.00 sec) ``` -**导入完成列影射与衍生列计算** +### 导入完成列影射与衍生列计算 1. 导入数据样例 @@ -1133,7 +1134,7 @@ mysql> SELECT * FROM routine_test08; 3 rows in set (0.01 sec) ``` -**导入包含包围符的数据** +### 导入包含包围符的数据 1. 导入数据样例 @@ -1828,7 +1829,7 @@ FROM KAFKA ); ``` -这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。csv 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 +这个时候需要 Kafka 中的数据包含表名的信息。目前仅支持从 Kafka 的 Value 中获取动态表名,且需要符合这种格式:以 JSON 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。CSV 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败。注意,动态表不支持后面介绍的 column_mapping 配置。 ### 严格模式导入 diff --git a/versioned_docs/version-2.1/data-operate/import/import-way/routine-load-manual.md b/versioned_docs/version-2.1/data-operate/import/import-way/routine-load-manual.md index ebc848171f9..a51fdc9d6aa 100644 --- a/versioned_docs/version-2.1/data-operate/import/import-way/routine-load-manual.md +++ b/versioned_docs/version-2.1/data-operate/import/import-way/routine-load-manual.md @@ -30,9 +30,13 @@ Routine Load is a streaming load job that supports Exactly-Once semantics, ensur ## Usage Scenarios +### Supported Data Sources + +Routine Load supports consuming data from Kafka clusters. + ### Supported Data File Formats -Routine Load supports consuming data in CSV and JSON formats from Kafka. +Routine Load supports consuming data in CSV and JSON formats. When loading CSV format, it is necessary to clearly distinguish between null values and empty strings: @@ -44,35 +48,33 @@ When loading CSV format, it is necessary to clearly distinguish between null val When using Routine Load to consume data from Kafka, there are the following limitations: -- It supports unauthenticated Kafka access as well as Kafka clusters authenticated through SSL. - - The supported message formats are CSV and JSON text formats. Each message in CSV should be on a separate line, and the line should not end with a newline character. - By default, it supports Kafka versions 0.10.0.0 and above. If you need to use a Kafka version below 0.10.0.0 (such as 0.9.0, 0.8.2, 0.8.1, 0.8.0), you need to modify the BE configuration by setting the value of `kafka_broker_version_fallback` to the compatible older version, or directly set the value of `property.broker.version.fallback` when creating the Routine Load. However, using an older version may mean that some new features of Routine Load, such as setting the offset of Kafka p [...] ## Basic Principles -Routine Load continuously consumes data from Kafka Topic and writes it into Doris. +Routine Load continuously consumes data from Kafka Topics and writes it into Doris. -When a Routine Load job is created in Doris, it generates a persistent load job and several load tasks: +When a Routine Load job is created in Doris, it generates a resident import job that consists of several import tasks: -- Load Job: Each routine load corresponds to a load job. The load job is a persistent task that continuously consumes data from the Kafka Topic. +- Load Job: A Routine Load Job is a resident import job that continuously consumes data from the data source. -- Load Task: A load job is divided into several load tasks, which are loaded as independent basic units using the Stream Load method into BE. +- Load Task: An import job is broken down into several import tasks for actual consumption, with each task being an independent transaction. -The specific process of Routine Load is illustrated in the following diagram: +The specific import process of Routine Load is shown in the following diagram:  -1. The Client submits a Routine Load job to the FE to establish a persistent Routine Load Job. +1. The Client submits a request to create a Routine Load job to the FE, and the FE generates a resident import job (Routine Load Job) through the Routine Load Manager. -2. The FE splits the Routine Load Job into multiple Routine Load Tasks through the Job Scheduler. +2. The FE splits the Routine Load Job into several Routine Load Tasks through the Job Scheduler, which are then scheduled by the Task Scheduler and distributed to BE nodes. -3. On the BE, each Routine Load Task is treated as a Stream Load task for importation and reports back to the FE upon completion. +3. On the BE, after a Routine Load Task is completed, it submits the transaction to the FE and updates the Job's metadata. -4. The Job Scheduler in the FE generates new Tasks based on the report results or retries failed Tasks. +4. After a Routine Load Task is submitted, it continues to generate new Tasks or retries timed-out Tasks. -5. The Routine Load Job continuously generates new Tasks to complete uninterrupted data importation. +5. The newly generated Routine Load Tasks continue to be scheduled by the Task Scheduler in a continuous cycle. ## Quick Start @@ -428,38 +430,21 @@ Here are the available parameters for the job_properties clause: | --------------------------- | ------------------------------------------------------------ | | desired_concurrent_number | <ul><li>Default value: 256</li><li>Description: Specifies the desired concurrency for a single load subtask (load task). It modifies the expected number of load subtasks for a Routine Load job. The actual concurrency during the load process may not be equal to the desired concurrency. The actual concurrency is determined based on factors such as the number of nodes in the cluster, the load on the cluster, and the characteristics of the data source. The act [...] | max_batch_interval | The maximum running time for each subtask, in seconds. Must be greater than 0, with a default value of 60s. max_batch_interval/max_batch_rows/max_batch_size together form the execution threshold for subtasks. If any of these parameters reaches the threshold, the load subtask ends and a new one is generated. | -| max_batch_rows | The maximum number of rows read by each subtask. Must be greater than or equal to 200,000. The default value is 200,000(The default value for versions 2.1.5 and higher is 20,000,000.). max_batch_interval/max_batch_rows/max_batch_size together form the execution threshold for subtasks. If any of these parameters reaches the threshold, the load subtask ends and a new one is generated. | -| max_batch_size | The maximum number of bytes read by each subtask. The unit is bytes, and and the range is from 100MB to 10GB. The default value is 100MB(The default value for versions 2.1.5 and higher is 1G). max_batch_interval/max_batch_rows/max_batch_size together form the execution threshold for subtasks. If any of these parameters reaches the threshold, the load subtask ends and a new one is generated. | +| max_batch_rows | The maximum number of rows read by each subtask. Must be greater than or equal to 200,000. The default value is 20,000,000. max_batch_interval/max_batch_rows/max_batch_size together form the execution threshold for subtasks. If any of these parameters reaches the threshold, the load subtask ends and a new one is generated. | +| max_batch_size | The maximum number of bytes read by each subtask. The unit is bytes, and the range is from 100MB to 10GB. The default value is 1G. max_batch_interval/max_batch_rows/max_batch_size together form the execution threshold for subtasks. If any of these parameters reaches the threshold, the load subtask ends and a new one is generated. | | max_error_number | The maximum number of error rows allowed within a sampling window. Must be greater than or equal to 0. The default value is 0, which means no error rows are allowed. The sampling window is `max_batch_rows * 10`. If the number of error rows within the sampling window exceeds `max_error_number`, the regular job will be paused and manual intervention is required to check for data quality issues using the [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/ [...] | strict_mode | Whether to enable strict mode. The default value is disabled. Strict mode applies strict filtering to type conversions during the load process. If enabled, non-null original data that results in a NULL after type conversion will be filtered out. The filtering rules in strict mode are as follows:<ul><li>Derived columns (generated by functions) are not affected by strict mode.</li><li>If a column's type needs to be converted, any data with an incorrect data [...] | timezone | Specifies the time zone used by the load job. The default is to use the session's timezone parameter. This parameter affects the results of all timezone-related functions involved in the load. | -| format | Specifies the data format for the load. The default is csv, and JSON format is supported. | +| format | Specifies the data format for the load. The default is CSV, and JSON format is supported. | | jsonpaths | When the data format is JSON, jsonpaths can be used to specify the JSON paths to extract data from nested structures. It is a JSON array of strings, where each string represents a JSON path. | -| delimiter | Specifies the delimiter used in CSV files. The default delimiter is a comma (,). | -| escape | Specifies the escape character used in CSV files. The default escape character is a backslash (\). | -| quote | Specifies the quote character used in CSV files. The default quote character is a double quotation mark ("). | -| null_format | Specifies the string representation of NULL values in the load data. The default is an empty string. | -| skip_header_lines | Specifies the number of lines to skip at the beginning of the load data file. The default is 0, which means no lines are skipped. | -| skip_footer_lines | Specifies the number of lines to skip at the end of the load data file. The default is 0, which means no lines are skipped. | -| query_parallelism | Specifies the number of parallel threads used by each subtask to execute SQL statements. The default is 1. | -| query_timeout | Specifies the timeout for SQL statement execution. The default is 3600 seconds (1 hour). | -| query_band | Specifies the query band string to be set for each subtask. | -| memory_quota_per_query | Specifies the memory quota for each subtask, in bytes. The default is -1, which means to use the system default. | -| error_table_name | Specifies the name of the error table where error rows are stored. The default is null, which means no error table is generated. | -| error_table_database | Specifies the database where the error table is located. The default is null, which means the error table is located in the current database. | -| error_table_schema | Specifies the schema where the error table is located. The default is null, which means the error table is located in the public schema. | -| error_table_logging_policy | Specifies the logging policy for the error table. The default is null, which means to use the system default. | -| error_table_reuse_policy | Specifies the reuse policy for the error table. The default is null, which means to use the system default. | -| error_table_creation_time | Specifies the creation time for the error table. The default is null, which means to use the current time. | -| error_table_cleanup_time | Specifies the cleanup time for the error table. The default is null, which means not set a cleanup time. | -| error_table_log | Specifies whether to enable logging for the error table. The default is null, which means to use the system default. | -| error_table_backup_time | Specifies the backup time for the error table. The default is null, which means not set a backup time. | -| error_table_backup_path | Specifies the backup path for the error table. The default is null, which means not set a backup path. | -| error_table_lifetime | Specifies the lifetime of the error table. The default is null, which means to use the system default. | -| error_table_backup_lifetime | Specifies the backup lifetime for the error table. The default is null, which means to use the system default. | -| error_table_label | Specifies the label for the error table. The default is null, which means not set a label. | -| error_table_priority | Specifies the priority for the error table. The default is null, which means to use the system default. | -| error_table_comment | Specifies the comment for the error table. The default is null, which means to not set a comment. | +| json_root | When importing JSON format data, you can specify the root node of the JSON data through json_root. Doris will extract and parse elements from the root node. Default is empty. For example, specify the JSON root node with: `"json_root" = "$.RECORDS"` | +| strip_outer_array | When importing JSON format data, if strip_outer_array is true, it indicates that the JSON data is presented as an array, and each element in the data will be treated as a row. Default value is false. Typically, JSON data in Kafka might be represented as an array with square brackets `[]` in the outermost layer. In this case, you can specify `"strip_outer_array" = "true"` to consume Topic data in array mode. For example, the following data will be parsed into [...] +| send_batch_parallelism | Used to set the parallelism of sending batch data. If the parallelism value exceeds the `max_send_batch_parallelism_per_job` in BE configuration, the coordinating BE will use the value of `max_send_batch_parallelism_per_job`. | +| load_to_single_tablet | Supports importing data to only one tablet in the corresponding partition per task. Default value is false. This parameter can only be set when importing data to OLAP tables with random bucketing. | +| partial_columns | Specifies whether to enable partial column update feature. Default value is false. This parameter can only be set when the table model is Unique and uses Merge on Write. Multi-table streaming does not support this parameter. For details, refer to [Partial Column Update](../../../data-operate/update/update-of-unique-model) | +| max_filter_ratio | The maximum allowed filter ratio within the sampling window. Must be between 0 and 1 inclusive. Default value is 1.0, indicating any error rows can be tolerated. The sampling window is `max_batch_rows * 10`. If the ratio of error rows to total rows within the sampling window exceeds `max_filter_ratio`, the routine job will be suspended and require manual intervention to check data quality issues. Rows filtered by WHERE conditions are not counted as error rows. | +| enclose | Specifies the enclosing character. When CSV data fields contain line or column separators, a single-byte character can be specified as an enclosing character for protection to prevent accidental truncation. For example, if the column separator is "," and the enclosing character is "'", the data "a,'b,c'" will have "b,c" parsed as one field. | +| escape | Specifies the escape character. Used to escape characters in fields that are identical to the enclosing character. For example, if the data is "a,'b,'c'", the enclosing character is "'", and you want "b,'c" to be parsed as one field, you need to specify a single-byte escape character, such as "\", and modify the data to "a,'b,\'c'". | These parameters can be used to customize the behavior of a Routine Load job according to your specific requirements. @@ -548,9 +533,7 @@ The columns in the result set provide the following information: ## Load example -### Loading CSV Format - -**Setting the Maximum Error Tolerance** +### Setting the Maximum Error Tolerance 1. Load sample data: @@ -604,7 +587,7 @@ The columns in the result set provide the following information: 2 rows in set (0.01 sec) ``` -**Consuming Data from a Specified Offset** +### Consuming Data from a Specified Offset 1. Load sample data: @@ -657,7 +640,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Specifying the Consumer Group's group.id and client.id** +### Specifying the Consumer Group's group.id and client.id 1. Load sample data: @@ -708,7 +691,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Setting load filtering conditions** +### Setting load filtering conditions 1. Load sample data: @@ -761,7 +744,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Loading specified partition data** +### Loading specified partition data 1. Load sample data: @@ -814,7 +797,7 @@ The columns in the result set provide the following information: 1 rows in set (0.01 sec) ``` -**Setting Time Zone for load** +### Setting Time Zone for load 1. Load sample data: @@ -868,6 +851,8 @@ The columns in the result set provide the following information: 3 rows in set (0.00 sec) ``` +### Setting merge_type + **Specify merge_type for delete operation** 1. Load sample data: @@ -1089,7 +1074,7 @@ The columns in the result set provide the following information: 5 rows in set (0.00 sec) ``` -**Load with column mapping and derived column calculation** +### Load with column mapping and derived column calculation 1. Load sample data: @@ -1140,7 +1125,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Load with enclosed data** +### Load with enclosed data 1. Load sample data: diff --git a/versioned_docs/version-3.0/data-operate/import/import-way/routine-load-manual.md b/versioned_docs/version-3.0/data-operate/import/import-way/routine-load-manual.md index 2ce98b6d04a..a51fdc9d6aa 100644 --- a/versioned_docs/version-3.0/data-operate/import/import-way/routine-load-manual.md +++ b/versioned_docs/version-3.0/data-operate/import/import-way/routine-load-manual.md @@ -30,9 +30,13 @@ Routine Load is a streaming load job that supports Exactly-Once semantics, ensur ## Usage Scenarios +### Supported Data Sources + +Routine Load supports consuming data from Kafka clusters. + ### Supported Data File Formats -Routine Load supports consuming data in CSV and JSON formats from Kafka. +Routine Load supports consuming data in CSV and JSON formats. When loading CSV format, it is necessary to clearly distinguish between null values and empty strings: @@ -44,35 +48,33 @@ When loading CSV format, it is necessary to clearly distinguish between null val When using Routine Load to consume data from Kafka, there are the following limitations: -- It supports unauthenticated Kafka access as well as Kafka clusters authenticated through SSL. - - The supported message formats are CSV and JSON text formats. Each message in CSV should be on a separate line, and the line should not end with a newline character. - By default, it supports Kafka versions 0.10.0.0 and above. If you need to use a Kafka version below 0.10.0.0 (such as 0.9.0, 0.8.2, 0.8.1, 0.8.0), you need to modify the BE configuration by setting the value of `kafka_broker_version_fallback` to the compatible older version, or directly set the value of `property.broker.version.fallback` when creating the Routine Load. However, using an older version may mean that some new features of Routine Load, such as setting the offset of Kafka p [...] ## Basic Principles -Routine Load continuously consumes data from Kafka Topic and writes it into Doris. +Routine Load continuously consumes data from Kafka Topics and writes it into Doris. -When a Routine Load job is created in Doris, it generates a persistent load job and several load tasks: +When a Routine Load job is created in Doris, it generates a resident import job that consists of several import tasks: -- Load Job: Each routine load corresponds to a load job. The load job is a persistent task that continuously consumes data from the Kafka Topic. +- Load Job: A Routine Load Job is a resident import job that continuously consumes data from the data source. -- Load Task: A load job is divided into several load tasks, which are loaded as independent basic units using the Stream Load method into BE. +- Load Task: An import job is broken down into several import tasks for actual consumption, with each task being an independent transaction. -The specific process of Routine Load is illustrated in the following diagram: +The specific import process of Routine Load is shown in the following diagram:  -1. The Client submits a Routine Load job to the FE to establish a persistent Routine Load Job. +1. The Client submits a request to create a Routine Load job to the FE, and the FE generates a resident import job (Routine Load Job) through the Routine Load Manager. -2. The FE splits the Routine Load Job into multiple Routine Load Tasks through the Job Scheduler. +2. The FE splits the Routine Load Job into several Routine Load Tasks through the Job Scheduler, which are then scheduled by the Task Scheduler and distributed to BE nodes. -3. On the BE, each Routine Load Task is treated as a Stream Load task for importation and reports back to the FE upon completion. +3. On the BE, after a Routine Load Task is completed, it submits the transaction to the FE and updates the Job's metadata. -4. The Job Scheduler in the FE generates new Tasks based on the report results or retries failed Tasks. +4. After a Routine Load Task is submitted, it continues to generate new Tasks or retries timed-out Tasks. -5. The Routine Load Job continuously generates new Tasks to complete uninterrupted data importation. +5. The newly generated Routine Load Tasks continue to be scheduled by the Task Scheduler in a continuous cycle. ## Quick Start @@ -429,37 +431,20 @@ Here are the available parameters for the job_properties clause: | desired_concurrent_number | <ul><li>Default value: 256</li><li>Description: Specifies the desired concurrency for a single load subtask (load task). It modifies the expected number of load subtasks for a Routine Load job. The actual concurrency during the load process may not be equal to the desired concurrency. The actual concurrency is determined based on factors such as the number of nodes in the cluster, the load on the cluster, and the characteristics of the data source. The act [...] | max_batch_interval | The maximum running time for each subtask, in seconds. Must be greater than 0, with a default value of 60s. max_batch_interval/max_batch_rows/max_batch_size together form the execution threshold for subtasks. If any of these parameters reaches the threshold, the load subtask ends and a new one is generated. | | max_batch_rows | The maximum number of rows read by each subtask. Must be greater than or equal to 200,000. The default value is 20,000,000. max_batch_interval/max_batch_rows/max_batch_size together form the execution threshold for subtasks. If any of these parameters reaches the threshold, the load subtask ends and a new one is generated. | -| max_batch_size | The maximum number of bytes read by each subtask. The unit is bytes, and and the range is from 100MB to 10GB. The default value is 1G. max_batch_interval/max_batch_rows/max_batch_size together form the execution threshold for subtasks. If any of these parameters reaches the threshold, the load subtask ends and a new one is generated. | +| max_batch_size | The maximum number of bytes read by each subtask. The unit is bytes, and the range is from 100MB to 10GB. The default value is 1G. max_batch_interval/max_batch_rows/max_batch_size together form the execution threshold for subtasks. If any of these parameters reaches the threshold, the load subtask ends and a new one is generated. | | max_error_number | The maximum number of error rows allowed within a sampling window. Must be greater than or equal to 0. The default value is 0, which means no error rows are allowed. The sampling window is `max_batch_rows * 10`. If the number of error rows within the sampling window exceeds `max_error_number`, the regular job will be paused and manual intervention is required to check for data quality issues using the [SHOW ROUTINE LOAD](../../../sql-manual/sql-statements/ [...] | strict_mode | Whether to enable strict mode. The default value is disabled. Strict mode applies strict filtering to type conversions during the load process. If enabled, non-null original data that results in a NULL after type conversion will be filtered out. The filtering rules in strict mode are as follows:<ul><li>Derived columns (generated by functions) are not affected by strict mode.</li><li>If a column's type needs to be converted, any data with an incorrect data [...] | timezone | Specifies the time zone used by the load job. The default is to use the session's timezone parameter. This parameter affects the results of all timezone-related functions involved in the load. | -| format | Specifies the data format for the load. The default is csv, and JSON format is supported. | +| format | Specifies the data format for the load. The default is CSV, and JSON format is supported. | | jsonpaths | When the data format is JSON, jsonpaths can be used to specify the JSON paths to extract data from nested structures. It is a JSON array of strings, where each string represents a JSON path. | -| delimiter | Specifies the delimiter used in CSV files. The default delimiter is a comma (,). | -| escape | Specifies the escape character used in CSV files. The default escape character is a backslash (\). | -| quote | Specifies the quote character used in CSV files. The default quote character is a double quotation mark ("). | -| null_format | Specifies the string representation of NULL values in the load data. The default is an empty string. | -| skip_header_lines | Specifies the number of lines to skip at the beginning of the load data file. The default is 0, which means no lines are skipped. | -| skip_footer_lines | Specifies the number of lines to skip at the end of the load data file. The default is 0, which means no lines are skipped. | -| query_parallelism | Specifies the number of parallel threads used by each subtask to execute SQL statements. The default is 1. | -| query_timeout | Specifies the timeout for SQL statement execution. The default is 3600 seconds (1 hour). | -| query_band | Specifies the query band string to be set for each subtask. | -| memory_quota_per_query | Specifies the memory quota for each subtask, in bytes. The default is -1, which means to use the system default. | -| error_table_name | Specifies the name of the error table where error rows are stored. The default is null, which means no error table is generated. | -| error_table_database | Specifies the database where the error table is located. The default is null, which means the error table is located in the current database. | -| error_table_schema | Specifies the schema where the error table is located. The default is null, which means the error table is located in the public schema. | -| error_table_logging_policy | Specifies the logging policy for the error table. The default is null, which means to use the system default. | -| error_table_reuse_policy | Specifies the reuse policy for the error table. The default is null, which means to use the system default. | -| error_table_creation_time | Specifies the creation time for the error table. The default is null, which means to use the current time. | -| error_table_cleanup_time | Specifies the cleanup time for the error table. The default is null, which means not set a cleanup time. | -| error_table_log | Specifies whether to enable logging for the error table. The default is null, which means to use the system default. | -| error_table_backup_time | Specifies the backup time for the error table. The default is null, which means not set a backup time. | -| error_table_backup_path | Specifies the backup path for the error table. The default is null, which means not set a backup path. | -| error_table_lifetime | Specifies the lifetime of the error table. The default is null, which means to use the system default. | -| error_table_backup_lifetime | Specifies the backup lifetime for the error table. The default is null, which means to use the system default. | -| error_table_label | Specifies the label for the error table. The default is null, which means not set a label. | -| error_table_priority | Specifies the priority for the error table. The default is null, which means to use the system default. | -| error_table_comment | Specifies the comment for the error table. The default is null, which means to not set a comment. | +| json_root | When importing JSON format data, you can specify the root node of the JSON data through json_root. Doris will extract and parse elements from the root node. Default is empty. For example, specify the JSON root node with: `"json_root" = "$.RECORDS"` | +| strip_outer_array | When importing JSON format data, if strip_outer_array is true, it indicates that the JSON data is presented as an array, and each element in the data will be treated as a row. Default value is false. Typically, JSON data in Kafka might be represented as an array with square brackets `[]` in the outermost layer. In this case, you can specify `"strip_outer_array" = "true"` to consume Topic data in array mode. For example, the following data will be parsed into [...] +| send_batch_parallelism | Used to set the parallelism of sending batch data. If the parallelism value exceeds the `max_send_batch_parallelism_per_job` in BE configuration, the coordinating BE will use the value of `max_send_batch_parallelism_per_job`. | +| load_to_single_tablet | Supports importing data to only one tablet in the corresponding partition per task. Default value is false. This parameter can only be set when importing data to OLAP tables with random bucketing. | +| partial_columns | Specifies whether to enable partial column update feature. Default value is false. This parameter can only be set when the table model is Unique and uses Merge on Write. Multi-table streaming does not support this parameter. For details, refer to [Partial Column Update](../../../data-operate/update/update-of-unique-model) | +| max_filter_ratio | The maximum allowed filter ratio within the sampling window. Must be between 0 and 1 inclusive. Default value is 1.0, indicating any error rows can be tolerated. The sampling window is `max_batch_rows * 10`. If the ratio of error rows to total rows within the sampling window exceeds `max_filter_ratio`, the routine job will be suspended and require manual intervention to check data quality issues. Rows filtered by WHERE conditions are not counted as error rows. | +| enclose | Specifies the enclosing character. When CSV data fields contain line or column separators, a single-byte character can be specified as an enclosing character for protection to prevent accidental truncation. For example, if the column separator is "," and the enclosing character is "'", the data "a,'b,c'" will have "b,c" parsed as one field. | +| escape | Specifies the escape character. Used to escape characters in fields that are identical to the enclosing character. For example, if the data is "a,'b,'c'", the enclosing character is "'", and you want "b,'c" to be parsed as one field, you need to specify a single-byte escape character, such as "\", and modify the data to "a,'b,\'c'". | These parameters can be used to customize the behavior of a Routine Load job according to your specific requirements. @@ -548,9 +533,7 @@ The columns in the result set provide the following information: ## Load example -### Loading CSV Format - -**Setting the Maximum Error Tolerance** +### Setting the Maximum Error Tolerance 1. Load sample data: @@ -604,7 +587,7 @@ The columns in the result set provide the following information: 2 rows in set (0.01 sec) ``` -**Consuming Data from a Specified Offset** +### Consuming Data from a Specified Offset 1. Load sample data: @@ -657,7 +640,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Specifying the Consumer Group's group.id and client.id** +### Specifying the Consumer Group's group.id and client.id 1. Load sample data: @@ -708,7 +691,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Setting load filtering conditions** +### Setting load filtering conditions 1. Load sample data: @@ -761,7 +744,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Loading specified partition data** +### Loading specified partition data 1. Load sample data: @@ -775,9 +758,9 @@ The columns in the result set provide the following information: ```sql CREATE TABLE demo.routine_test05 ( - id INT NOT NULL COMMENT "User ID", - name VARCHAR(30) NOT NULL COMMENT "Name", - age INT COMMENT "Age", + id INT NOT NULL COMMENT "ID", + name VARCHAR(30) NOT NULL COMMENT "Name", + age INT COMMENT "Age", date DATETIME COMMENT "Date" ) DUPLICATE KEY(`id`) @@ -814,7 +797,7 @@ The columns in the result set provide the following information: 1 rows in set (0.01 sec) ``` -**Setting Time Zone for load** +### Setting Time Zone for load 1. Load sample data: @@ -868,6 +851,8 @@ The columns in the result set provide the following information: 3 rows in set (0.00 sec) ``` +### Setting merge_type + **Specify merge_type for delete operation** 1. Load sample data: @@ -1089,7 +1074,7 @@ The columns in the result set provide the following information: 5 rows in set (0.00 sec) ``` -**Load with column mapping and derived column calculation** +### Load with column mapping and derived column calculation 1. Load sample data: @@ -1140,7 +1125,7 @@ The columns in the result set provide the following information: 3 rows in set (0.01 sec) ``` -**Load with enclosed data** +### Load with enclosed data 1. Load sample data: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org