This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push: new 4542e581f48 [FIx] Rewrite the export/outfile documents (#739) 4542e581f48 is described below commit 4542e581f4810d466e9aac8c8ebab0fb696cd735 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Tue Jun 18 13:57:52 2024 +0800 [FIx] Rewrite the export/outfile documents (#739) --- docs/data-operate/export/export-manual.md | 473 +++++++++++++++------ docs/data-operate/export/export-overview.md | 124 ++++++ docs/data-operate/export/outfile.md | 361 +++++++++++----- .../current/data-operate/export/export-manual.md | 466 ++++++++++++++------ .../current/data-operate/export/export-overview.md | 131 ++++++ .../current/data-operate/export/outfile.md | 342 +++++++++++---- sidebars.json | 3 +- 7 files changed, 1423 insertions(+), 477 deletions(-) diff --git a/docs/data-operate/export/export-manual.md b/docs/data-operate/export/export-manual.md index 81a39d0cacd..3e25725e1d7 100644 --- a/docs/data-operate/export/export-manual.md +++ b/docs/data-operate/export/export-manual.md @@ -24,212 +24,405 @@ specific language governing permissions and limitations under the License. --> -# Export Overview +## Export +This document provides an overview of using the `EXPORT` command to export data stored in Doris. - `Export` is a feature provided by Doris that allows for the asynchronous export of data. This feature allows the user to export the data of specified tables or partitions in a specified file format through the Broker process or S3 protocol/ HDFS protocol, to remote storage such as object storage or HDFS. +For detailed information on the `EXPORT` command, please refer to: [EXPORT](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/EXPORT.md). -Currently, `EXPORT` supports exporting Doris local tables / views / external tables and supports exporting to file formats including parquet, orc, csv, csv_with_names, and csv_with_names_and_types. +## Overview +`Export` is a feature provided by Doris to asynchronously export data. This functionality allows users to export data from specified tables or partitions in a specified file format to a target storage system, including object storage, HDFS, or local file systems. -This document mainly introduces the basic principles, usage, best practices and precautions of Export. +`Export` is an asynchronous command that returns immediately after execution. Users can view detailed information about the export task using the `Show Export` command. -## Principles +For guidance on choosing between `SELECT INTO OUTFILE` and `EXPORT`, please refer to the [Export Overview](./export-overview.md). -After a user submits an `Export Job`, Doris will calculate all the Tablets involved in this job. Then, based on the `parallelism` parameter specified by the user, these tablets will be grouped. Each thread is responsible for a group of tablets, generating multiple `SELECT INTO OUTFILE` query plans. The query plan will read the data from the included tablets and then write the data to the specified path in remote storage through S3 protocol/ HDFS protocol/ Broker. +The `EXPORT` command currently supports exporting the following types of tables or views: -The overall execution process is as follows: +* Doris internal tables +* Doris logical views +* Doris Catalog tables -1. The user submits an Export job to FE. -2. FE calculates all the tablets to be exported and groups them based on the `parallelism` parameter. Each group generates multiple `SELECT INTO OUTFILE` query plans based on the `maximum_number_of_export_partitions` parameter. +The `EXPORT` command currently supports the following export formats: -3. Based on the parallelism parameter, an equal number of `ExportTaskExecutor` are generated, and each `ExportTaskExecutor` is responsible for a thread, which is scheduled and executed by FE's `Job scheduler` framework. -4. FE's `Job scheduler` schedules and executes the `ExportTaskExecutor`, and each `ExportTaskExecutor` serially executes the multiple `SELECT INTO OUTFILE` query plans it is responsible for. +* Parquet +* ORC +* CSV +* CSV_with_names +* CSV_with_names_and_types -## Start Export +Compression formats are not supported. -For detailed usage of Export, please refer to [EXPORT](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/EXPORT.md). +Example: -Export's detailed commands can be passed through `HELP EXPORT;` in mysql client. Examples are as follows: +```sql +mysql> EXPORT TABLE tpch1.lineitem TO "s3://my_bucket/path/to/exp_" + -> PROPERTIES( + -> "format" = "csv", + -> "max_file_size" = "2048MB" + -> ) + -> WITH s3 ( + -> "s3.endpoint" = "${endpoint}", + -> "s3.region" = "${region}", + -> "s3.secret_key"="${sk}", + -> "s3.access_key" = "${ak}" + -> ); +``` +After submitting the job, you can query the export job status using the [SHOW EXPORT](../../sql-manual/sql-statements/Show-Statements/SHOW-EXPORT.md) command. An example result is shown below: -### Export to HDFS +```sql +mysql> show export\G +*************************** 1. row *************************** + JobId: 143265 + Label: export_0aa6c944-5a09-4d0b-80e1-cb09ea223f65 + State: FINISHED + Progress: 100% + TaskInfo: {"partitions":[],"parallelism":5,"data_consistency":"partition","format":"csv","broker":"S3","column_separator":"\t","line_delimiter":"\n","max_file_size":"2048MB","delete_existing_files":"","with_bom":"false","db":"tpch1","tbl":"lineitem"} + Path: s3://ftw-datalake-test-1308700295/test_ycs_activeDefense_v10/test_csv/exp_ + CreateTime: 2024-06-11 18:01:18 + StartTime: 2024-06-11 18:01:18 + FinishTime: 2024-06-11 18:01:31 + Timeout: 7200 + ErrorMsg: NULL +OutfileInfo: [ + [ + { + "fileNumber": "1", + "totalRows": "6001215", + "fileSize": "747503989bytes", + "url": "s3://my_bucket/path/to/exp_6555cd33e7447c1-baa9568b5c4eb0ac_*" + } + ] +] +1 row in set (0.00 sec) +``` +The columns in the `show export` command result have the following meanings: + +* JobId: The unique ID of the job +* Label: The label of the export job. If not specified, the system generates one by default. +* State: Job status: + * PENDING: Job is pending scheduling + * EXPORTING: Data is being exported + * FINISHED: Job completed successfully + * CANCELLED: Job failed +* Progress: Job progress. This is measured by the number of query plans. If there are 10 threads in total and 3 have completed, the progress is 30%. +* TaskInfo: Job information displayed in JSON format: + * db: Database name + * tbl: Table name + * partitions: Specified partitions to export. An empty list indicates all partitions. + * column_separator: Column separator for the export file. + * line_delimiter: Line separator for the export file. + * tablet num: Total number of involved tablets. + * broker: Name of the broker used. + * coord num: Number of query plans. + * max_file_size: Maximum size of an export file. + * delete_existing_files: Whether to delete existing files and directories in the export directory. + * columns: Columns to export; an empty value means all columns are exported. + * format: File format of the export. +* Path: Export path in the remote storage. +* CreateTime/StartTime/FinishTime: Job creation time, scheduling start time, and end time. +* Timeout: Job timeout period in seconds, starting from CreateTime. +* ErrorMsg: If there is an error in the job, the reason is displayed here. +* OutfileInfo: If the job is successful, this displays detailed `SELECT INTO OUTFILE` result information. + +After submitting an export job, you can cancel it before it succeeds or fails using the [CANCEL EXPORT](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md) command. An example cancellation command is shown below: + +```sql +CANCEL EXPORT FROM tpch1 WHERE LABEL like "%export_%"; +``` +## Export File Column Type Mapping +`Export` supports exporting data in Parquet and ORC file formats. These formats have their own data types. Doris's export functionality automatically maps Doris's data types to the corresponding data types in Parquet and ORC file formats. For detailed mapping relationships, please refer to the "Export File Column Type Mapping" section in the [Export Overview](./export-overview.md) document. -**WITH HDFS (Recommended)** +## Examples +### Export to HDFS +Export the `col1` and `col2` columns from partitions `p1` and `p2` of the `db1.tbl1` table to HDFS, setting the export job label to `mylabel`. The export file format is CSV (default format), the column separator is `,`, and the maximum file size is 512MB. ```sql EXPORT TABLE db1.tbl1 -PARTITION (p1,p2) -[WHERE [expr]] +PARTITION (p1, p2) TO "hdfs://host/path/to/export/" PROPERTIES ( "label" = "mylabel", - "column_separator"=",", - "columns" = "col1,col2", - "parallelism" = "3" + "column_separator" = ",", + "max_file_size" = "512MB", + "columns" = "col1, col2" ) -with HDFS ( -"fs.defaultFS"="hdfs://hdfs_host:port", -"hadoop.username" = "hadoop" +WITH HDFS ( + "fs.defaultFS" = "hdfs://hdfs_host:port", + "hadoop.username" = "hadoop" ); ``` - -* `label`: The identifier of this export job. You can use this identifier to view the job status later. -* `column_separator`: Column separator. The default is `\t`. Supports invisible characters, such as'\x07'. -* `column`: columns to be exported, separated by commas, if this parameter is not filled in, all columns of the table will be exported by default. -* `line_delimiter`: Line separator. The default is `\n`. Supports invisible characters, such as'\x07'. -* `parallelism`:Exporting with 3 concurrent threads. - -**WITH BROKER** - -This requires starting a broker process first. +If HDFS high availability is enabled, provide HA information as follows: ```sql EXPORT TABLE db1.tbl1 -PARTITION (p1,p2) -[WHERE [expr]] -TO "hdfs://host/path/to/export/" +PARTITION (p1, p2) +TO "hdfs://HDFS8000871/path/to/export/" PROPERTIES ( "label" = "mylabel", - "column_separator"=",", - "columns" = "col1,col2", - "parallelism" = "3" + "column_separator" = ",", + "max_file_size" = "512MB", + "columns" = "col1, col2" ) -WITH BROKER "broker_name" +WITH HDFS ( + "fs.defaultFS" = "hdfs://HDFS8000871", + "hadoop.username" = "hadoop", + "dfs.nameservices" = "your-nameservices", + "dfs.ha.namenodes.your-nameservices" = "nn1, nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1" = "ip:port", + "dfs.namenode.rpc-address.HDFS8000871.nn2" = "ip:port", + "dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" +); +``` +If the Hadoop cluster has high availability and Kerberos authentication enabled, refer to the following SQL statement: + +```sql +EXPORT TABLE db1.tbl1 +PARTITION (p1, p2) +TO "hdfs://HDFS8000871/path/to/export/" +PROPERTIES ( - "username"="xxx", - "password"="yyy" + "label" = "mylabel", + "column_separator" = ",", + "max_file_size" = "512MB", + "columns" = "col1, col2" +) +WITH HDFS ( + "fs.defaultFS" = "hdfs://hacluster/", + "hadoop.username" = "hadoop", + "dfs.nameservices" = "hacluster", + "dfs.ha.namenodes.hacluster" = "n1, n2", + "dfs.namenode.rpc-address.hacluster.n1" = "192.168.0.1:8020", + "dfs.namenode.rpc-address.hacluster.n2" = "192.168.0.2:8020", + "dfs.client.failover.proxy.provider.hacluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "dfs.namenode.kerberos.principal" = "hadoop/_h...@realm.com", + "hadoop.security.authentication" = "kerberos", + "hadoop.kerberos.principal" = "doris_t...@realm.com", + "hadoop.kerberos.keytab" = "/path/to/doris_test.keytab" +); +``` +### Export to S3 +Export all data from the `s3_test` table to S3 in CSV format, using the invisible character `\\x07` as the line delimiter. + +```sql +EXPORT TABLE s3_test TO "s3://bucket/a/b/c" +PROPERTIES ( + "line_delimiter" = "\\x07" +) WITH s3 ( + "s3.endpoint" = "xxxxx", + "s3.region" = "xxxxx", + "s3.secret_key" = "xxxx", + "s3.access_key" = "xxxxx" +); +``` +### Export to Local File System +> To export data to the local file system, add `enable_outfile_to_local=true` in `fe.conf` and restart FE. + +Export all data from the `test` table to local storage: + +```sql +-- Parquet format +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "columns" = "k1, k2", + "format" = "parquet" +); + +-- ORC format +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "columns" = "k1, k2", + "format" = "orc" +); + +-- CSV_with_names format, using 'AA' as the column separator and 'zz' as the line delimiter +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "format" = "csv_with_names", + "column_separator" = "AA", + "line_delimiter" = "zz" +); + +-- CSV_with_names_and_types format +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "format" = "csv_with_names_and_types" ); ``` +> Note: + Exporting to the local file system is not suitable for public cloud users and is only applicable to private deployments. It is assumed that the user has full control over the cluster nodes. Doris does not perform legality checks on the export path. If the Doris process user does not have write permissions for the path, or if the path does not exist, an error will be reported. Additionally, for security reasons, if a file with the same name already exists in the path, the export will fail. + Doris does not manage files exported to the local file system, nor does it check disk space. Users must manage these files themselves, such as by cleaning up as needed. -### Export to Object Storage (Supports S3 Protocol) +### Exporting Specific Partitions +Export tasks support exporting only specific partitions of internal tables in Doris, such as exporting only the `p1` and `p2` partitions of the `test` table. ```sql -EXPORT TABLE test TO "s3://bucket/path/to/export/dir/" -WITH S3 ( - "s3.endpoint" = "http://host", - "s3.access_key" = "AK", - "s3.secret_key"="SK", - "s3.region" = "region" +EXPORT TABLE test +PARTITION (p1, p2) +TO "file:///home/user/tmp/" +PROPERTIES ( + "columns" = "k1, k2" ); ``` +### Filtering Data During Export +Export tasks support filtering data based on predicate conditions, exporting only data that meets the conditions, such as exporting only data where `k1 < 50`. -- `s3.access_key`/`s3.secret_key`:Is your key to access the object storage API. -- `s3.endpoint`:Endpoint indicates the access domain name of object storage external services. -- `s3.region`:Region indicates the region where the object storage data center is located. +```sql +EXPORT TABLE test +WHERE k1 < 50 +TO "file:///home/user/tmp/" +PROPERTIES ( + "columns" = "k1, k2", + "column_separator" = "," +); +``` +### Exporting External Table Data +Export tasks support exporting external table data from the Doris Catalog: -### View Export Status +```sql +-- Create a catalog +CREATE CATALOG `tpch` PROPERTIES ( + "type" = "trino-connector", + "trino.connector.name" = "tpch", + "trino.tpch.column-naming" = "STANDARD", + "trino.tpch.splits-per-node" = "32" +); -After submitting a job, the job status can be viewed by querying the [SHOW EXPORT](../../sql-manual/sql-statements/Show-Statements/SHOW-EXPORT.md) command. The results are as follows: +-- Export data from the Catalog external table +EXPORT TABLE tpch.sf1.lineitem TO "file:///path/to/exp_" +PROPERTIES( + "parallelism" = "5", + "format" = "csv", + "max_file_size" = "1024MB" +); +``` + +> Note: Currently, exporting data from Catalog external tables does not support concurrent export. Even if `parallelism` is set to greater than 1, the export is still performed in a single thread. + +## Best Practices +### Consistent Export +The `Export` function supports partition/tablet-level granularity. The `data_consistency` parameter specifies the granularity for splitting the table to be exported: `none` represents the tablet level, and `partition` represents the partition level. ```sql -mysql> show EXPORT\G; -*************************** 1. row *************************** - JobId: 14008 - State: FINISHED - Progress: 100% - TaskInfo: {"partitions":[],"max_file_size":"","delete_existing_files":"","columns":"","format":"csv","column_separator":"\t","line_delimiter":"\n","db":"default_cluster:demo","tbl":"student4","tablet_num":30} - Path: hdfs://host/path/to/export/ -CreateTime: 2019-06-25 17:08:24 - StartTime: 2019-06-25 17:08:28 -FinishTime: 2019-06-25 17:08:34 - Timeout: 3600 - ErrorMsg: NULL - OutfileInfo: [ - [ - { - "fileNumber": "1", - "totalRows": "4", - "fileSize": "34bytes", - "url": "file:///127.0.0.1/Users/fangtiewei/tmp_data/export/f1ab7dcc31744152-bbb4cda2f5c88eac_" - } - ] -] -1 row in set (0.01 sec) +EXPORT TABLE test TO "file:///home/user/tmp" +PROPERTIES ( + "format" = "parquet", + "data_consistency" = "partition", + "max_file_size" = "512MB" +); ``` +- Setting `"data_consistency" = "partition"` constructs multiple `SELECT INTO OUTFILE` statements to export different partitions. +- Setting `"data_consistency" = "none"` constructs multiple `SELECT INTO OUTFILE` statements to export different tablets, which may belong to the same partition. +Refer to the appendix for the logic behind constructing `SELECT INTO OUTFILE` statements. -* JobId: The unique ID of the job -* State: Job status: - * PENDING: Jobs to be Scheduled - * EXPORTING: Data Export - * FINISHED: Operation Successful - * CANCELLED: Job Failure -* Progress: Work progress. The schedule is based on the query plan. Assuming there are 10 threads in total and 3 have been completed, the progress will be 30%. -* TaskInfo: Job information in Json format: - * db: database name - * tbl: Table name - * partitions: Specify the exported partition. `empty` Represents all partitions. - * column separator: The column separator for the exported file. - * line delimiter: The line separator for the exported file. - * tablet num: The total number of tablets involved. - * Broker: The name of the broker used. - * max_file_size: The maximum size of an export file. - * delete_existing_files: Whether to delete existing files and directories in the specified export directory. - * columns: Specifies the column names to be exported. Empty values represent exporting all columns. - * format: The file format for export. -* Path: Export path on remote storage. -* CreateTime/StartTime/FinishTime: Creation time, start scheduling time and end time of jobs. -* Timeout: Job timeout. The unit is seconds. This time is calculated from CreateTime. -* Error Msg: If there is an error in the job, the cause of the error is shown here. -* OutfileInfo: If the export job is successful, specific `SELECT INTO OUTFILE` result information will be displayed here. - -### Cancel Export Job - - -After submitting a job, the job can be canceled by using the [CANCEL EXPORT](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md) command. For example: +### Export Job Concurrency +Set different levels of concurrency to export data concurrently. Specify a concurrency of 5: ```sql -CANCEL EXPORT -FROM example_db -WHERE LABEL like "%example%"; +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "format" = "parquet", + "max_file_size" = "512MB", + "parallelism" = "5" +); ``` +Refer to the appendix for the principles of concurrent export. -## Best Practices +### Clear Export Directory Before Exporting +```sql +EXPORT TABLE test TO "file:///home/user/tmp" +PROPERTIES ( + "format" = "parquet", + "max_file_size" = "512MB", + "delete_existing_files" = "true" +); +``` +If `"delete_existing_files" = "true"` is set, the export job will first delete all files and directories under `/home/user/`, then export data to that directory. + +> Note: + To use the `delete_existing_files` parameter, add `enable_delete_existing_files = true` in `fe.conf` and restart FE. The `delete_existing_files` parameter is a dangerous operation and is recommended only for testing environments. + +### Set Export File Size +Export jobs support setting the size of export files. If a single file exceeds the set value, it will be split into multiple files. -### Concurrent Export +```sql +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "format" = "parquet", + "max_file_size" = "512MB" +); +``` +Setting `"max_file_size" = "512MB"` limits the maximum size of a single export file to 512MB. -An Export job can be configured with the `parallelism` parameter to concurrently export data. The `parallelism` parameter specifies the number of threads to execute the `EXPORT Job`. Each thread is responsible for exporting a subset of the total tablets. +## Notes +1. **Memory Limits** + - An Export job typically involves only `scan-export` operations and does not require complex memory-consuming computations. The default 2GB memory limit usually suffices. + - In scenarios where the query plan needs to scan too many tablets or versions on the same BE, memory might run out. Adjust the `exec_mem_limit` session variable to increase the memory limit. -The underlying execution logic of an `Export Job `is actually the `SELECT INTO OUTFILE` statement. Each thread specified by the `parallelism` parameter executes independent `SELECT INTO OUTFILE` statements. +2. **Export Data Volume** + - Avoid exporting a large volume of data at once. The recommended maximum data volume for an Export job is several tens of GBs. Larger exports can lead to more garbage files and higher retry costs. If the table size is too large, consider partition-based export. + - Export jobs scan data and occupy IO resources, which may affect system query latency. -The specific logic for splitting an `Export Job` into multiple `SELECT INTO OUTFILE` is, to evenly distribute all the tablets of the table among all parallel threads. For example: +3. **Managing Export Files** + - If an Export job fails, the generated files are not deleted and need to be removed manually. -- If num(tablets) = 40 and parallelism = 3, then the three threads will be responsible for 14, 13, and 13 tablets, respectively. -- If num(tablets) = 2 and parallelism = 3, then Doris automatically sets the parallelism to 2, and each thread is responsible for one tablet. +4. **Data Consistency** + - During export, the system simply checks if the tablet versions are consistent. It is advisable to avoid importing data into the table during the export process. -When the number of tablets responsible for a thread exceeds the `maximum_tablets_of_outfile_in_export` value (default is 10, and can be modified by adding the `maximum_tablets_of_outfile_in_export` parameter in fe.conf), the thread will split the tablets which are responsibled for this thread into multiple `SELECT INTO OUTFILE` statements. For example: +5. **Export Timeout** + - If the data volume is large and exceeds the export timeout, the Export job will fail. Use the `timeout` parameter in the Export command to extend the timeout and retry the Export command. -- If a thread is responsible for 14 tablets and `maximum_tablets_of_outfile_in_export = 10`, then the thread will be responsible for two `SELECT INTO OUTFILE` statements. The first `SELECT INTO OUTFILE` statement exports 10 tablets, and the second `SELECT INTO OUTFILE` statement exports 4 tablets. The two `SELECT INTO OUTFILE` statements are executed serially by this thread. +6. **Export Failure** + - If the FE restarts or switches primary during the Export job, the job will fail and need to be resubmitted. Use the `show export` command to check the Export job status. -### exec\_mem\_limit +7. **Number of Partitions Exported** + - The maximum number of partitions an Export Job can export is 2000. Modify this limit by adding the `maximum_number_of_export_partitions` parameter in `fe.conf` and restarting FE. -The query plan for an `Export Job` typically involves only `scanning and exporting`, and does not involve compute logic that requires a lot of memory. Therefore, the default memory limit of 2GB is usually sufficient to meet the requirements. +8. **Concurrent Export** + - When exporting concurrently, configure the thread count and parallelism appropriately to fully utilize system resources and avoid performance bottlenecks. Monitor the progress and performance metrics in real-time to identify and address issues promptly. -However, in certain scenarios, such as a query plan that requires scanning too many tablets on the same BE, or when there are too many data versions of tablets, it may result in insufficient memory. In these cases, you can adjust the session variable `exec_mem_limit` to increase the memory usage limit. +9. **Data Integrity** + - After the export operation is complete, verify that the exported data is complete and correct to ensure data quality and integrity. -## Notes +## Appendix +### Principles of Concurrent Export + +The underlying mechanism of an Export task in Doris is to execute `SELECT INTO OUTFILE` SQL statements. When a user initiates an Export task, Doris constructs one or more `SELECT INTO OUTFILE` execution plans based on the table to be exported. These execution plans are then submitted to Doris's Job Scheduler, which automatically schedules and executes them. + +By default, Export tasks run single-threaded. To improve export efficiency, the Export command can include a `parallelism` parameter to enable concurrent data export. Setting `parallelism` greater than 1 allows the Export task to use multiple threads to concurrently execute `SELECT INTO OUTFILE` query plans. The `parallelism` parameter specifies the number of threads to execute the EXPORT job. + +The logic for constructing one or more `SELECT INTO OUTFILE` execution plans for an Export task is as follows: + +1. **Select Consistency Model** + Choose the consistency model for export based on the `data_consistency` parameter. This is semantic and unrelated to concurrency. Users should select the consistency model according to their needs. + +2. **Determine Concurrency** + The `parallelism` parameter determines the number of threads to run the `SELECT INTO OUTFILE` execution plans. `Parallelism` specifies the maximum possible number of threads. + + > Note: Even if the Export command sets the `parallelism` parameter, the actual number of concurrent threads for the Export task also depends on Job Scheduler resources. If the system is busy and Job Scheduler thread resources are tight, the actual number of threads assigned to the Export task might not reach the specified `parallelism`, affecting the concurrent export. To address this, reduce system load or adjust the FE configuration `async_task_consumer_thread_num` to increase the [...] -* It is not recommended to export large amounts of data at one time. The maximum amount of exported data recommended by an Export job is tens of GB. Excessive export results in more junk files and higher retry costs. -* If the amount of table data is too large, it is recommended to export it by partition. -* During the operation of the Export job, if FE restarts or cuts the master, the Export job will fail, requiring the user to resubmit. -* If the Export job fails, the temporary files and directory generated in the remote storage will not be deleted, requiring the user to delete them manually. -* Export jobs scan data and occupy IO resources, which may affect the query latency of the system. -* The Export job can export data from `Doris Base tables`, `View`, and `External tables`, but not from `Rollup Index`. -* When using the EXPORT command, please ensure that the target path exists, otherwise the export may fail. -* When concurrent export is enabled, please configure the thread count and parallelism appropriately to fully utilize system resources and avoid performance bottlenecks. -* When exporting to a local file, pay attention to file permissions and the path, ensure that you have sufficient permissions to write, and follow the appropriate file system path. -* It is possible to monitor progress and performance metrics in real-time during the export process to identify issues promptly and make optimal adjustments. -* It is recommended to verify the integrity and accuracy of the exported data after the export operation is completed to ensure the quality and integrity of the data. +3. **Determine Task Volume for Each Outfile Statement** + Each thread decides the number of `outfile` based on `maximum_tablets_of_outfile_in_export` and the actual number of partitions/buckets in the data. -## Relevant configuration + > `maximum_tablets_of_outfile_in_export` is an FE configuration with a default value of 10. It specifies the maximum number of partitions/buckets allowed in a single `outfile` statement for an Export task. Modifying this configuration requires restarting FE. -### FE +### Example -* `maximum_tablets_of_outfile_in_export`: The maximum number of tablets allowed for an OutFile statement in an ExportExecutorTask. +Consider a table with 20 partitions, each having 5 buckets, totaling 100 buckets. Set `data_consistency = none` and `maximum_tablets_of_outfile_in_export = 10`. -## More Help +1. **`parallelism = 5`** + The Export task splits the 100 buckets into 5 parts, with each thread responsible for 20 buckets. Each thread further splits its 20 buckets into 2 groups of 10 buckets each, with each group handled by one `outfile` query plan. Thus, the Export task has 5 threads running concurrently, each handling 2 `outfile` statements, which are executed serially within each thread. -For more detailed syntax and best practices used by Export, please refer to the [Export](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/EXPORT.md) command manual, You can also enter `HELP EXPORT` at the command line of the MySql client for more help. +2. **`parallelism = 3`** + The Export task splits the 100 buckets into 3 parts, with 3 threads responsible for 34, 33, and 33 buckets respectively. Each thread splits its buckets into 4 groups of 10 buckets (with the last group containing fewer than 10 buckets), each group handled by one `outfile` query plan. Thus, the Export task has 3 threads running concurrently, each handling 4 `outfile` statements, executed serially within each thread. -The underlying implementation of the `EXPORT` command is the `SELECT INTO OUTFILE` statement. For more information about SELECT INTO OUTFILE, please refer to [Export Query Result](./outfile) and [SELECT INTO OUTFILE](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE.md). +3. **`parallelism = 120`** + Since the table has only 100 buckets, the system forces `parallelism` to 100 and executes it accordingly. The Export task splits the 100 buckets into 100 parts, with each thread responsible for 1 bucket. Each thread's single bucket is split into 1 group (actually just 1 bucket), handled by one `outfile` query plan. Thus, the Export task has 100 threads running concurrently, each handling 1 `outfile` statement, with each `outfile` statement exporting just 1 bucket. +### Optimizing Export Performance +For optimal Export performance in the current version, consider the following settings: +1. Enable the session variable `enable_parallel_outfile`. +2. Set the Export's `parallelism` parameter to a high value, so each thread handles only one `SELECT INTO OUTFILE` query plan. +3. Set the FE configuration `maximum_tablets_of_outfile_in_export` to a low value, so each `SELECT INTO OUTFILE` query plan exports a small amount of data. diff --git a/docs/data-operate/export/export-overview.md b/docs/data-operate/export/export-overview.md new file mode 100644 index 00000000000..2dfd3c3b6fb --- /dev/null +++ b/docs/data-operate/export/export-overview.md @@ -0,0 +1,124 @@ +--- +{ + "title": "Export Overview", + "language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## Data Export Overview +The data export function is used to write the query result set or Doris table data into the specified storage system in the specified file format. + +The differences between the export function and the data backup function are as follows: + +| |Data Export|Data Backup| +| ----- | ----- | ----- | +|Final Storage Location|HDFS, Object Storage, Local File System|HDFS, Object Storage| +|Data Format|Open file formats such as Parquet, ORC, CSV|Doris internal storage format| +|Execution Speed|Moderate (requires reading data and converting to the target data format)|Fast (no parsing and conversion required, directly upload Doris data files)| +|Flexibility|Can flexibly define the data to be exported through SQL statements|Only supports table-level full backup| +|Use Cases|Result set download, data exchange between different systems|Data backup, data migration between Doris clusters| + +## Choosing Export Methods +Doris provides three different data export methods: + +* **SELECT INTO OUTFILE**: Supports the export of any SQL result set. +* **EXPORT**: Supports the export of partial or full table data. +* **MySQL DUMP**: Compatible with the MySQL dump command for data export. + +The similarities and differences between the three export methods are as follows: + +| |SELECT INTO OUTFILE|EXPORT|MySQL DUMP| +| ----- | ----- | ----- | ----- | +|Synchronous/Asynchronous|Synchronous|Asynchronous (submit EXPORT tasks and check task progress via SHOW EXPORT command)|Synchronous| +|Supports any SQL|Yes|No|No| +|Export specific partitions|Yes|Yes|No| +|Export specific tablets|Yes|No|No| +|Concurrent export|Supported with high concurrency (depends on whether the SQL statement has operators such as ORDER BY that need to be processed on a single node)|Supported with high concurrency (supports tablet-level concurrent export)|Not supported, single-threaded export only| +|Supported export data formats|Parquet, ORC, CSV|Parquet, ORC, CSV|MySQL Dump proprietary format| +|Supports exporting external tables|Yes|Partially supported|No| +|Supports exporting views|Yes|Yes|Yes| +|Supported export locations|S3, HDFS, LOCAL|S3, HDFS, LOCAL|LOCAL| + +### SELECT INTO OUTFILE +Suitable for the following scenarios: + +* Data needs to be exported after complex calculations, such as filtering, aggregation, joins, etc. +* Suitable for scenarios that require synchronous tasks. + +### EXPORT +Suitable for the following scenarios: + +* Large-scale single table export, with simple filtering conditions. +* Scenarios that require asynchronous task submission. + +### MySQL Dump +Suitable for the following scenarios: + +* Compatible with the MySQL ecosystem, requires exporting both table structure and data. +* Only for development testing or scenarios with very small data volumes. + +## Export File Column Type Mapping +Parquet and ORC file formats have their own data types. Doris's export function can automatically map Doris's data types to the corresponding data types in Parquet and ORC file formats. The CSV format does not have types, all data is output as text. + +The following table shows the mapping between Doris data types and Parquet, ORC file format data types: +1. Doris export to ORC file format data type mapping table: + | Doris Type | Orc Type | + | ----- | ----- | + | boolean | boolean | + | tinyint | tinyint | + | smallint | smallint | + | int | int | + | bigint | bigint | + | largeInt | string | + | date | string | + | datev2 | string | + | datetime | string | + | datetimev2 | timestamp | + | float | float | + | double | double | + | char / varchar / string | string | + | decimal | decimal | + | struct | struct | + | map | map | + | array | array | + +2. When Doris exports to Parquet file format, it first converts Doris in-memory data to Arrow in-memory data format, then writes out to Parquet file format. The mapping relationship between Doris data types and Arrow data types is: + + | Doris Type | Arrow Type | + | ----- | ----- | + | boolean | boolean | + | tinyint | int8 | + | smallint | int16 | + | int | int32 | + | bigint | int64 | + | largeInt | utf8 | + | date | utf8 | + | datev2 | Date32Type | + | datetime | utf8 | + | datetimev2 | TimestampType | + | float | float32 | + | double | float64 | + | char / varchar / string | utf8 | + | decimal | decimal128 | + | struct | struct | + | map | map | + | array | list | \ No newline at end of file diff --git a/docs/data-operate/export/outfile.md b/docs/data-operate/export/outfile.md index 252a9eee561..70e149764f6 100644 --- a/docs/data-operate/export/outfile.md +++ b/docs/data-operate/export/outfile.md @@ -24,158 +24,301 @@ specific language governing permissions and limitations under the License. --> +## SELECT INTO OUTFILE +This document introduces how to use the `SELECT INTO OUTFILE` command to export query results. +For a detailed introduction to the `SELECT INTO OUTFILE` command, refer to: [SELECT INTO OUTFILE](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE.md). -This document describes how to use the [SELECT INTO OUTFILE](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE) command to export query results. +## Overview +The `SELECT INTO OUTFILE` command exports the result data of the `SELECT` statement to a target storage system, such as object storage, HDFS, or the local file system, in a specified file format. -`SELECT INTO OUTFILE` is a synchronous command, which means that the operation is completed when the command returns. It also returns a row of results to show the execution result of the export. +`SELECT INTO OUTFILE` is a synchronous command, meaning it completes when the command returns. If successful, it returns information about the number, size, and paths of the exported files. If it fails, it returns error information. + +For guidance on choosing between `SELECT INTO OUTFILE` and `EXPORT`, see the [Export Overview](./export-overview.md). + +### Supported Export Formats +`SELECT INTO OUTFILE` currently supports the following export formats: +- Parquet +- ORC +- CSV +- CSV with column names (`csv_with_names`) +- CSV with column names and types (`csv_with_names_and_types`) + +Compressed formats are not supported. + +### Example +```sql +mysql> SELECT * FROM tbl1 LIMIT 10 INTO OUTFILE "file:///home/work/path/result_"; ++------------+-----------+----------+--------------------------------------------------------------------+ +| FileNumber | TotalRows | FileSize | URL | ++------------+-----------+----------+--------------------------------------------------------------------+ +| 1 | 2 | 8 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ | ++------------+-----------+----------+--------------------------------------------------------------------+ +``` +Explanation of the returned results: +- **FileNumber**: The number of generated files. +- **TotalRows**: The number of rows in the result set. +- **FileSize**: The total size of the exported files in bytes. +- **URL**: The prefix of the exported file paths. Multiple files will be numbered sequentially with suffixes `_0`, `_1`, etc. + +## Export File Column Type Mapping +`SELECT INTO OUTFILE` supports exporting to Parquet and ORC file formats. Parquet and ORC have their own data types, and Doris can automatically map its data types to corresponding Parquet and ORC data types. Refer to the "Export File Column Type Mapping" section in the [Export Overview](./export-overview.md) document for the specific mapping relationships. ## Examples ### Export to HDFS +Export query results to the `hdfs://path/to/` directory, specifying the export format as PARQUET: + +```sql +SELECT c1, c2, c3 FROM tbl +INTO OUTFILE "hdfs://${host}:${fileSystem_port}/path/to/result_" +FORMAT AS PARQUET +PROPERTIES +( + "fs.defaultFS" = "hdfs://ip:port", + "hadoop.username" = "hadoop" +); +``` +If HDFS is configured for high availability, provide HA information, such as: -Export simple query results to the file `hdfs://path/to/result.txt`, specifying the export format as CSV. +```sql +SELECT c1, c2, c3 FROM tbl +INTO OUTFILE "hdfs://HDFS8000871/path/to/result_" +FORMAT AS PARQUET +PROPERTIES +( + "fs.defaultFS" = "hdfs://HDFS8000871", + "hadoop.username" = "hadoop", + "dfs.nameservices" = "your-nameservices", + "dfs.ha.namenodes.your-nameservices" = "nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1" = "ip:port", + "dfs.namenode.rpc-address.HDFS8000871.nn2" = "ip:port", + "dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" +); +``` +If the Hadoop cluster is configured for high availability and Kerberos authentication is enabled, you can refer to the following SQL statement: ```sql SELECT * FROM tbl INTO OUTFILE "hdfs://path/to/result_" -FORMAT AS CSV +FORMAT AS PARQUET PROPERTIES ( - "broker.name" = "my_broker", - "column_separator" = ",", - "line_delimiter" = "\n" + "fs.defaultFS"="hdfs://hacluster/", + "hadoop.username" = "hadoop", + "dfs.nameservices"="hacluster", + "dfs.ha.namenodes.hacluster"="n1,n2", + "dfs.namenode.rpc-address.hacluster.n1"="192.168.0.1:8020", + "dfs.namenode.rpc-address.hacluster.n2"="192.168.0.2:8020", + "dfs.client.failover.proxy.provider.hacluster"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "dfs.namenode.kerberos.principal"="hadoop/_h...@realm.com", + "hadoop.security.authentication"="kerberos", + "hadoop.kerberos.principal"="doris_t...@realm.com", + "hadoop.kerberos.keytab"="/path/to/doris_test.keytab" ); ``` -### Export to Local Files - -When exporting to a local file, you need to configure `enable_outfile_to_local=true` in fe.conf first +### Export to S3 +Export query results to the S3 storage at `s3://path/to/` directory, specifying the export format as ORC. Provide `sk`, `ak`, and other necessary information: ```sql -select * from tbl1 limit 10 -INTO OUTFILE "file:///home/work/path/result_"; +SELECT * FROM tbl +INTO OUTFILE "s3://path/to/result_" +FORMAT AS ORC +PROPERTIES( + "s3.endpoint" = "https://xxx", + "s3.region" = "ap-beijing", + "s3.access_key"= "your-ak", + "s3.secret_key" = "your-sk" +); ``` -For more usage, see [OUTFILE documentation](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE). - -## Concurrent Export - -By default, the export of the query result set is non-concurrent, that is, a single point of export. If the user wants the query result set to be exported concurrently, the following conditions need to be met: +### Export to Local File System +> To export to the local file system, add `enable_outfile_to_local=true` in `fe.conf` and restart FE. -1. session variable 'enable_parallel_outfile' to enable concurrent export: ```set enable_parallel_outfile = true;``` +Export query results to the BE's `file:///path/to/` directory, specifying the export format as CSV, with a comma as the column separator: -2. The export method is S3, HDFS instead of using a broker - -3. The query can meet the needs of concurrent export, for example, the top level does not contain single point nodes such as sort. (I will give an example later, which is a query that does not export the result set concurrently) - -If the above three conditions are met, the concurrent export query result set can be triggered. Concurrency = ```be_instacne_num * parallel_fragment_exec_instance_num``` - -### How to Verify that the Result Set is Exported Concurrently - -After the user enables concurrent export through the session variable setting, if you want to verify whether the current query can be exported concurrently, you can use the following method. - -``` -explain select xxx from xxx where xxx into outfile "s3://xxx" format as csv properties ("AWS_ENDPOINT" = "xxx", ...); -``` - -After explaining the query, Doris will return the plan of the query. If you find that ```RESULT FILE SINK``` appears in ```PLAN FRAGMENT 1```, it means that the export concurrency has been opened successfully. -If ```RESULT FILE SINK``` appears in ```PLAN FRAGMENT 0```, it means that the current query cannot be exported concurrently (the current query does not satisfy the three conditions of concurrent export at the same time). - -``` -Planning example for concurrent export: -+-----------------------------------------------------------------------------+ -| Explain String | -+-----------------------------------------------------------------------------+ -| PLAN FRAGMENT 0 | -| OUTPUT EXPRS:<slot 2> | <slot 3> | <slot 4> | <slot 5> | -| PARTITION: UNPARTITIONED | -| | -| RESULT SINK | -| | -| 1:EXCHANGE | -| | -| PLAN FRAGMENT 1 | -| OUTPUT EXPRS:`k1` + `k2` | -| PARTITION: HASH_PARTITIONED: `default_cluster:test`.`multi_tablet`.`k1` | -| | -| RESULT FILE SINK | -| FILE PATH: s3://ml-bd-repo/bpit_test/outfile_1951_ | -| STORAGE TYPE: S3 | -| | -| 0:OlapScanNode | -| TABLE: multi_tablet | -+-----------------------------------------------------------------------------+ +```sql +SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1 +INTO OUTFILE "file:///path/to/result_" +FORMAT AS CSV +PROPERTIES( + "column_separator" = "," +); ``` -## Usage Examples +> Note: +Exporting to local files is not suitable for public cloud users and is intended for private deployment users only. By default, users have full control over cluster nodes. Doris does not check the validity of the export path provided by the user. If the Doris process user does not have write permissions for the path, or the path does not exist, an error will be reported. Additionally, for security reasons, if a file with the same name already exists at the path, the export will fail. Dori [...] -For details, please refer to [OUTFILE Document](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE). +## Best Practices -## Return Results +### Generate Export Success Indicator File +The `SELECT INTO OUTFILE` command is synchronous, meaning that the task connection could be interrupted during SQL execution, leaving uncertainty about whether the export completed successfully or whether the data is complete. You can use the `success_file_name` parameter to generate an indicator file upon successful export. -The command is a synchronization command. The command returns, which means the operation is over. -At the same time, a row of results will be returned to show the exported execution result. +Similar to Hive, users can determine whether the export completed successfully and whether the files in the export directory are complete by checking for the presence of the file specified by the `success_file_name` parameter. -If it exports and returns normally, the result is as follows: +For example, exporting the results of a `SELECT` statement to Tencent Cloud COS `s3://${bucket_name}/path/my_file_`, specifying the export format as CSV, and setting the success indicator file name to `SUCCESS`: +```sql +SELECT k1, k2, v1 FROM tbl1 LIMIT 100000 +INTO OUTFILE "s3://my_bucket/path/my_file_" +FORMAT AS CSV +PROPERTIES +( + "s3.endpoint" = "${endpoint}", + "s3.region" = "ap-beijing", + "s3.access_key"= "ak", + "s3.secret_key" = "sk", + "column_separator" = ",", + "line_delimiter" = "\n", + "success_file_name" = "SUCCESS" +) ``` -mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_"; -+------------+-----------+----------+--------------------------------------------------------------------+ -| FileNumber | TotalRows | FileSize | URL | -+------------+-----------+----------+--------------------------------------------------------------------+ -| 1 | 2 | 8 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ | -+------------+-----------+----------+--------------------------------------------------------------------+ -1 row in set (0.05 sec) -``` - -* FileNumber: The number of files finally generated. +Upon completion, an additional file named `SUCCESS` will be generated. -* TotalRows: The number of rows in the result set. +### Concurrent Export +By default, the query results in the `SELECT` section are aggregated to a single BE node, which exports data single-threadedly. However, in some cases (e.g., queries without an `ORDER BY` clause), concurrent export can be enabled to have multiple BE nodes export data simultaneously, improving export performance. -* FileSize: The total size of the exported file. Unit byte. +Here’s an example demonstrating how to enable concurrent export: -* URL: If it is exported to a local disk, the Compute Node to which it is exported is displayed here. +1. Enable the concurrent export session variable: -If a concurrent export is performed, multiple rows of data will be returned. - -``` -+------------+-----------+----------+--------------------------------------------------------------------+ -| FileNumber | TotalRows | FileSize | URL | -+------------+-----------+----------+--------------------------------------------------------------------+ -| 1 | 3 | 7 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ | -| 1 | 2 | 4 | file:///192.168.1.11/home/work/path/result_{fragment_instance_id}_ | -+------------+-----------+----------+--------------------------------------------------------------------+ -2 rows in set (2.218 sec) +```sql +mysql> SET enable_parallel_outfile = true; ``` -If the execution is incorrect, an error message will be returned, such as: +2. Execute the export command: ```sql -mysql> SELECT * FROM tbl INTO OUTFILE ... -ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... +mysql> SELECT * FROM demo.tbl + -> INTO OUTFILE "file:///path/to/ftw/export/exp_" + -> FORMAT AS PARQUET; ++------------+-----------+----------+-------------------------------------------------------------------------------+ +| FileNumber | TotalRows | FileSize | URL | ++------------+-----------+----------+-------------------------------------------------------------------------------+ +| 1 | 104494 | 7998308 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d3_ | +| 1 | 104984 | 8052491 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d5_ | +| 1 | 104345 | 7981406 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d1_ | +| 1 | 104034 | 7977301 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d4_ | +| 1 | 104238 | 7979757 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d2_ | +| 1 | 159450 | 11870222 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d0_ | +| 1 | 209691 | 16082100 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7ce_ | +| 1 | 208769 | 16004096 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7cf_ | ++------------+-----------+----------+-------------------------------------------------------------------------------+ ``` +With concurrent export successfully enabled, the result may consist of multiple rows, indicating that multiple threads exported data concurrently. -## Notice - -* The CSV format does not support exporting binary types, such as BITMAP and HLL types. These types will be output as `\N`, which is null. - -* If you do not enable concurrent export, the query result is exported by a single BE node in a single thread. Therefore, the export time and the export result set size are positively correlated. Turning on concurrent export can reduce the export time. - -* The export command does not check whether the file and file path exist. Whether the path will be automatically created or whether the existing file will be overwritten is entirely determined by the semantics of the remote storage system. +Adding an `ORDER BY` clause to the query prevents concurrent export, as the top-level sorting node necessitates single-threaded export: -* If an error occurs during the export process, the exported file may remain on the remote storage system. Doris will not clean these files. The user needs to manually clean up. - -* The timeout of the export command is the same as the timeout of the query. It can be set by `SET query_timeout = xxx`. - -* For empty result query, there will be an empty file. - -* File spliting will ensure that a row of data is stored in a single file. Therefore, the size of the file is not strictly equal to `max_file_size`. +```sql +mysql> SELECT * FROM demo.tbl ORDER BY id + -> INTO OUTFILE "file:///path/to/ftw/export/exp_" + -> FORMAT AS PARQUET; ++------------+-----------+----------+-------------------------------------------------------------------------------+ +| FileNumber | TotalRows | FileSize | URL | ++------------+-----------+----------+-------------------------------------------------------------------------------+ +| 1 | 1100005 | 80664607 | file:///127.0.0.1/mnt/disk2/ftw/export/exp_20c5461055774128-826256c0cfb3d8fc_ | ++------------+-----------+----------+-------------------------------------------------------------------------------+ +``` +Here, the result is a single row, indicating no concurrent export was triggered. -* For functions whose output is invisible characters, such as BITMAP and HLL types, the output is `\N`, which is NULL. +Refer to the appendix for more details on concurrent export principles. -* At present, the output type of some geo functions, such as `ST_Point` is VARCHAR, but the actual output value is an encoded binary character. Currently these functions will output garbled characters. For geo functions, use `ST_AsText` for output. +### Clear Export Directory Before Exporting +```sql +SELECT * FROM tbl1 +INTO OUTFILE "s3://my_bucket/export/my_file_" +FORMAT AS CSV +PROPERTIES +( + "s3.endpoint" = "${endpoint}", + "s3.region" = "region", + "s3.access_key"= "ak", + "s3.secret_key" = "sk", + "column_separator" = ",", + "line_delimiter" = "\n", + "delete_existing_files" = "true" +) +``` +If `"delete_existing_files" = "true"` is set, the export job will first delete all files and directories under `s3://my_bucket/export/`, then export data to that directory. -## More Help +> Note: +To use the `delete_existing_files` parameter, add `enable_delete_existing_files = true` to `fe.conf` and restart FE. This parameter is potentially dangerous and should only be used in a testing environment. -For more detailed syntax and best practices for using OUTFILE, please refer to the [OUTFILE](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE) command manual, you can also More help information can be obtained by typing `HELP OUTFILE` at the command line of the MySql client. +### Set Export File Size +```sql +SELECT * FROM tbl +INTO OUTFILE "s3://path/to/result_" +FORMAT AS ORC +PROPERTIES( + "s3.endpoint" = "https://xxx", + "s3.region" = "ap-beijing", + "s3.access_key"= "your-ak", + "s3.secret_key" = "your-sk", + "max_file_size" = "2048MB" +); +``` +Specifying `"max_file_size" = "2048MB"` ensures that the final file size does not exceed 2GB. If the total size exceeds 2GB, multiple files will be generated. + +## Considerations +1. **Export Data Volume and Efficiency**: + - The `SELECT INTO OUTFILE` function executes a SQL query. Without concurrent export, a single BE node and thread export the query results. The total export time includes both the query execution time and the result set write-out time. Enabling concurrent export can reduce the export time. +<br/> +2. **Export Timeout**: + - The export command shares the same timeout as the query. If the data volume is large and causes the export to timeout, you can extend the query timeout by setting the session variable `query_timeout`. +<br/> +3. **Export File Management**: + - Doris does not manage exported files, whether successfully exported or remaining from failed exports. Users must handle these files themselves. + - Additionally, `SELECT INTO OUTFILE` does not check for the existence of files or file paths. Whether `SELECT INTO OUTFILE` automatically creates paths or overwrites existing files depends entirely on the semantics of the remote storage system. +<br/> +4. **Empty Result Sets**: + - Exporting an empty result set still generates an empty file. +<br/> +5. **File Splitting**: + - File splitting ensures that a single row of data is stored completely in one file. Thus, the file size may not exactly equal `max_file_size`. +<br/> +6. **Non-visible Character Functions**: + - For functions outputting non-visible characters (e.g., BITMAP, HLL types), CSV output is `\N`, and Parquet/ORC output is NULL. + - Currently, some geographic functions like `ST_Point` output VARCHAR but with encoded binary characters, causing garbled output. Use `ST_AsText` for geographic functions. + +## Appendix +### Concurrent Export Principles +1. **Principle Overview**: + - Doris is a high-performance, real-time analytical database based on the MPP (Massively Parallel Processing) architecture. MPP divides large datasets into small chunks and processes them in parallel across multiple nodes. + - Concurrent export in `SELECT INTO OUTFILE` leverages this parallel processing capability, allowing multiple BE nodes to export parts of the result set simultaneously. +<br/> +2. **How to Determine Concurrent Export Eligibility**: + - Ensure Session Variable is Enabled: `set enable_parallel_outfile = true;` + - Check Execution Plan with `EXPLAIN`: + + ```sql + mysql> EXPLAIN SELECT ... INTO OUTFILE "s3://xxx" ...; + +-----------------------------------------------------------------------------+ + | Explain String | + +-----------------------------------------------------------------------------+ + | PLAN FRAGMENT 0 | + | OUTPUT EXPRS:<slot 2> | <slot 3> | <slot 4> | <slot 5> | + | PARTITION: UNPARTITIONED | + | | + | RESULT SINK | + | | + | 1:EXCHANGE | + | | + | PLAN FRAGMENT 1 | + | OUTPUT EXPRS:`k1` + + + `k2` | + | PARTITION: HASH_PARTITIONED: `default_cluster:test`.`multi_tablet`.`k1` | + | | + | RESULT FILE SINK | + | FILE PATH: s3://ml-bd-repo/bpit_test/outfile_1951_ | + | STORAGE TYPE: S3 | + | | + | 0:OlapScanNode | + | TABLE: multi_tablet | + +-----------------------------------------------------------------------------+ + ``` + The `EXPLAIN` command returns the query plan. If `RESULT FILE SINK` appears in `PLAN FRAGMENT 1`, the query can be exported concurrently. If it appears in `PLAN FRAGMENT 0`, concurrent export is not possible. +<br/> +3. **Export Concurrency**: + - When concurrent export conditions are met, the export task's concurrency is determined by: `BE nodes * parallel_fragment_exec_instance_num`. diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/export-manual.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/export-manual.md index c1dc73d8fa4..f11da5d0c79 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/export-manual.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/export-manual.md @@ -1,6 +1,6 @@ --- { - "title": "数据导出", + "title": "Export", "language": "zh-CN" } --- @@ -24,130 +24,87 @@ specific language governing permissions and limitations under the License. --> -# 数据导出 +## Export +本文档将介绍如何使用`EXPORT`命令导出 Doris 中存储的数据。 -异步导出(Export)是 Doris 提供的一种将数据异步导出的功能。该功能可以将用户指定的表或分区的数据,以指定的文件格式,通过 Broker 进程或 S3 协议/HDFS 协议 导出到远端存储上,如 对象存储 / HDFS 等。 +有关`EXPORT`命令的详细介绍,请参考:[EXPORT](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/EXPORT.md) -当前,EXPORT 支持导出 Doris 本地表 / View 视图 / 外表,支持导出到 parquet / orc / csv / csv_with_names / csv_with_names_and_types 文件格式。 +## 概述 +`Export` 是 Doris 提供的一种将数据异步导出的功能。该功能可以将用户指定的表或分区的数据,以指定的文件格式,导出到目标存储系统中,包括对象存储、HDFS 或本地文件系统。 -本文档主要介绍 Export 的基本原理、使用方式、最佳实践以及注意事项。 +`Export` 是一个异步执行的命令,命令执行成功后,立即返回结果,用户可以通过`Show Export` 命令查看该 Export 任务的详细信息。 -## 原理 +关于如何选择 `SELECT INTO OUTFILE` 和 `EXPORT`,请参阅 [导出综述](./export-view.md)。 -用户提交一个 Export 作业后。Doris 会统计这个作业涉及的所有 Tablet。然后根据`parallelism`参数(由用户指定)对这些 Tablet 进行分组。每个线程负责一组 tablets,生成若干个`SELECT INTO OUTFILE`查询计划。该查询计划会读取所包含的 Tablet 上的数据,然后通过 S3 协议 / HDFS 协议 / Broker 将数据写到远端存储指定的路径中。 -总体的执行流程如下: -1. 用户提交一个 Export 作业到 FE。 -2. FE 会统计要导出的所有 Tablets,然后根据`parallelism`参数将所有 Tablets 分组,每一组再根据`maximum_number_of_export_partitions`参数生成若干个`SELECT INTO OUTFILE`查询计划 -3. 根据`parallelism`参数,生成相同个数的`ExportTaskExecutor`,每一个`ExportTaskExecutor`由一个线程负责,线程由 FE 的 Job 调度框架去调度执行。 -4. FE 的 Job 调度器会去调度`ExportTaskExecutor`并执行,每一个`ExportTaskExecutor`会串行地去执行由它负责的若干个`SELECT INTO OUTFILE`查询计划。 +`EXPORT` 当前支持导出以下类型的表或视图 -## 开始导出 +* Doris 内表 +* Doris 逻辑视图 +* Doris Catalog 表 -Export 的详细用法可参考 [EXPORT](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/EXPORT) 。 -### 导出到 HDFS - -**WITH HDFS(推荐使用)** -```sql -EXPORT TABLE db1.tbl1 -PARTITION (p1,p2) -[WHERE [expr]] -TO "hdfs://host/path/to/export/" -PROPERTIES -( - "label" = "mylabel", - "column_separator"=",", - "columns" = "col1,col2", - "parallelism" = "3" -) -with HDFS ( -"fs.defaultFS"="hdfs://hdfs_host:port", -"hadoop.username" = "hadoop" -); -``` +`EXPORT` 目前支持以下导出格式 -* `label`:本次导出作业的标识。后续可以使用这个标识查看作业状态。 -* `column_separator`:列分隔符。默认为 `\t`。支持不可见字符,比如 '\x07'。 -* `columns`:要导出的列,使用英文状态逗号隔开,如果不填这个参数默认是导出表的所有列。 -* `line_delimiter`:行分隔符。默认为 `\n`。支持不可见字符,比如 '\x07'。 -* `parallelism`:并发 3 个线程去导出。 +* Parquet +* ORC +* csv +* csv\_with\_names +* csv\_with\_names\_and\_types -**WITH BROKER** +不支持压缩格式的导出。 -需要先启动一个 BROKER 进程。 -```sql -EXPORT TABLE db1.tbl1 -PARTITION (p1,p2) -[WHERE [expr]] -TO "hdfs://host/path/to/export/" -PROPERTIES -( - "label" = "mylabel", - "column_separator"=",", - "columns" = "col1,col2", - "parallelism" = "3" -) -WITH BROKER "broker_name" -( - "username"="xxx", - "password"="yyy" -); -``` - -### 导出到对象存储 -通过 s3 协议直接将数据导出到指定的存储。 +示例: ```sql -EXPORT TABLE test TO "s3://bucket/path/to/export/dir/" -WITH S3 ( - "s3.endpoint" = "http://host", - "s3.access_key" = "AK", - "s3.secret_key"="SK", - "s3.region" = "region" -); +mysql> EXPORT TABLE tpch1.lineitem TO "s3://my_bucket/path/to/exp_" + -> PROPERTIES( + -> "format" = "csv", + -> "max_file_size" = "2048MB" + -> ) + -> WITH s3 ( + -> "s3.endpoint" = "${endpoint}", + -> "s3.region" = "${region}", + -> "s3.secret_key"="${sk}", + -> "s3.access_key" = "${ak}" + -> ); ``` - -- `s3.access_key`/`s3.secret_key`:是您访问对象存储的 ACCESS_KEY/SECRET_KEY -- `s3.endpoint`:Endpoint 表示对象存储对外服务的访问域名。 -- `s3.region`:表示对象存储数据中心所在的地域。 - - -### 查看导出状态 - -提交作业后,可以通过 [SHOW EXPORT](../../sql-manual/sql-statements/Show-Statements/SHOW-EXPORT.md) 命令查询导出作业状态。结果举例如下: +提交作业后,可以通过 [SHOW EXPORT](../../sql-manual/sql-statements/Show-Statements/SHOW-EXPORT.md) 命令查询导出作业状态,结果举例如下: ```sql -mysql> show EXPORT\G; +mysql> show export\G *************************** 1. row *************************** - JobId: 14008 - State: FINISHED - Progress: 100% - TaskInfo: {"partitions":[],"max_file_size":"","delete_existing_files":"","columns":"","format":"csv","column_separator":"\t","line_delimiter":"\n","db":"default_cluster:demo","tbl":"student4","tablet_num":30} - Path: hdfs://host/path/to/export/ -CreateTime: 2019-06-25 17:08:24 - StartTime: 2019-06-25 17:08:28 -FinishTime: 2019-06-25 17:08:34 - Timeout: 3600 - ErrorMsg: NULL - OutfileInfo: [ + JobId: 143265 + Label: export_0aa6c944-5a09-4d0b-80e1-cb09ea223f65 + State: FINISHED + Progress: 100% + TaskInfo: {"partitions":[],"parallelism":5,"data_consistency":"partition","format":"csv","broker":"S3","column_separator":"\t","line_delimiter":"\n","max_file_size":"2048MB","delete_existing_files":"","with_bom":"false","db":"tpch1","tbl":"lineitem"} + Path: s3://ftw-datalake-test-1308700295/test_ycs_activeDefense_v10/test_csv/exp_ + CreateTime: 2024-06-11 18:01:18 + StartTime: 2024-06-11 18:01:18 + FinishTime: 2024-06-11 18:01:31 + Timeout: 7200 + ErrorMsg: NULL +OutfileInfo: [ [ { "fileNumber": "1", - "totalRows": "4", - "fileSize": "34bytes", - "url": "file:///127.0.0.1/Users/fangtiewei/tmp_data/export/f1ab7dcc31744152-bbb4cda2f5c88eac_" + "totalRows": "6001215", + "fileSize": "747503989bytes", + "url": "s3://my_bucket/path/to/exp_6555cd33e7447c1-baa9568b5c4eb0ac_*" } ] ] -1 row in set (0.01 sec) +1 row in set (0.00 sec) ``` +`show export` 命令返回的结果各个列的含义如下: * JobId:作业的唯一 ID +* Label:该导出作业的标签,如果Export没有指定,则系统会默认生成一个。 * State:作业状态: * PENDING:作业待调度 * EXPORTING:数据导出中 @@ -158,13 +115,13 @@ FinishTime: 2019-06-25 17:08:34 * db:数据库名 * tbl:表名 * partitions:指定导出的分区。`空`列表 表示所有分区。 - * column_separator:导出文件的列分隔符。 - * line_delimiter:导出文件的行分隔符。 + * column\_separator:导出文件的列分隔符。 + * line\_delimiter:导出文件的行分隔符。 * tablet num:涉及的总 Tablet 数量。 * broker:使用的 broker 的名称。 * coord num:查询计划的个数。 - * max_file_size:一个导出文件的最大大小。 - * delete_existing_files:是否删除导出目录下已存在的文件及目录。 + * max\_file\_size:一个导出文件的最大大小。 + * delete\_existing\_files:是否删除导出目录下已存在的文件及目录。 * columns:指定需要导出的列名,空值代表导出所有列。 * format:导出的文件格式 * Path:远端存储上的导出路径。 @@ -173,68 +130,303 @@ FinishTime: 2019-06-25 17:08:34 * ErrorMsg:如果作业出现错误,这里会显示错误原因。 * OutfileInfo:如果作业导出成功,这里会显示具体的`SELECT INTO OUTFILE`结果信息。 -### 取消导出任务 -:::info 备注 -`CANCEL EXPORT` 命令自 Doris 1.2.2 版本起支持。 +提交Export作业后,在Export任务成功或失败之前可以通过 [CANCEL EXPORT](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md) 命令取消导出作业。取消命令举例如下: -::: +```sql +CANCEL EXPORT FROM tpch1 WHERE LABEL like "%export_%"; +``` +## 导出文件列类型映射 +`Export`支持导出数据为Parquet、ORC 文件格式。Parquet、ORC 文件格式拥有自己的数据类型,Doris 的导出功能能够自动将 Doris 的数据类型导出为 Parquet、ORC 文件格式的对应数据类型,具体映射关系请参阅[导出综述](./export-view.md)文档的 "导出文件列类型映射" 部分。 -提交作业后,可以通过 [CANCEL EXPORT](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT) 命令取消导出作业。取消命令举例如下: +## 示例 +### 导出到 HDFS +将 db1.tbl1 表的p1和p2分区中的`col1` 列和`col2` 列数据导出到 HDFS 上,设置导出作业的 label 为 `mylabel`。导出文件格式为csv(默认格式),列分割符为`,`,导出作业单个文件大小限制为512MB。 ```sql -CANCEL EXPORT -FROM example_db -WHERE LABEL like "%example%"; +EXPORT TABLE db1.tbl1 +PARTITION (p1,p2) +TO "hdfs://host/path/to/export/" +PROPERTIES +( + "label" = "mylabel", + "column_separator"=",", + "max_file_size" = "512MB", + "columns" = "col1,col2" +) +with HDFS ( + "fs.defaultFS"="hdfs://hdfs_host:port", + "hadoop.username" = "hadoop" +); ``` +如果HDFS开启了高可用,则需要提供HA信息,如: -## 最佳实践 +```sql +EXPORT TABLE db1.tbl1 +PARTITION (p1,p2) +TO "hdfs://HDFS8000871/path/to/export/" +PROPERTIES +( + "label" = "mylabel", + "column_separator"=",", + "max_file_size" = "512MB", + "columns" = "col1,col2" +) +with HDFS ( + "fs.defaultFS" = "hdfs://HDFS8000871", + "hadoop.username" = "hadoop", + "dfs.nameservices" = "your-nameservices", + "dfs.ha.namenodes.your-nameservices" = "nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1" = "ip:port", + "dfs.namenode.rpc-address.HDFS8000871.nn2" = "ip:port", + "dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" +); +``` +如果Hadoop 集群开启了高可用并且启用了 Kerberos 认证,可以参考如下SQL语句: -### 并发导出 +```sql +EXPORT TABLE db1.tbl1 +PARTITION (p1,p2) +TO "hdfs://HDFS8000871/path/to/export/" +PROPERTIES +( + "label" = "mylabel", + "column_separator"=",", + "max_file_size" = "512MB", + "columns" = "col1,col2" +) +with HDFS ( + "fs.defaultFS"="hdfs://hacluster/", + "hadoop.username" = "hadoop", + "dfs.nameservices"="hacluster", + "dfs.ha.namenodes.hacluster"="n1,n2", + "dfs.namenode.rpc-address.hacluster.n1"="192.168.0.1:8020", + "dfs.namenode.rpc-address.hacluster.n2"="192.168.0.2:8020", + "dfs.client.failover.proxy.provider.hacluster"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "dfs.namenode.kerberos.principal"="hadoop/_h...@realm.com" + "hadoop.security.authentication"="kerberos", + "hadoop.kerberos.principal"="doris_t...@realm.com", + "hadoop.kerberos.keytab"="/path/to/doris_test.keytab" +); +``` +### 导出到 S3 +将 s3\_test 表中的所有数据导出到 s3 上,导出格式为csv,以不可见字符 "\\x07" 作为行分隔符。 -一个 Export 作业可以设置`parallelism`参数来并发导出数据。`parallelism`参数实际就是指定执行 EXPORT 作业的线程数量。每一个线程会负责导出表的部分 Tablets。 +```sql +EXPORT TABLE s3_test TO "s3://bucket/a/b/c" +PROPERTIES ( + "line_delimiter" = "\\x07" +) WITH s3 ( + "s3.endpoint" = "xxxxx", + "s3.region" = "xxxxx", + "s3.secret_key"="xxxx", + "s3.access_key" = "xxxxx" +) +``` +### 导出到本地文件系统 +> export数据导出到本地文件系统,需要在fe.conf中添加`enable_outfile_to_local=true`并且重启FE。 -一个 Export 作业的底层执行逻辑实际上是`SELECT INTO OUTFILE`语句,`parallelism`参数设置的每一个线程都会去执行独立的`SELECT INTO OUTFILE`语句。 +将test表中的所有数据导出到本地存储: -###### Export 作业拆分成多个`SELECT INTO OUTFILE`的具体逻辑是:将该表的所有 tablets 平均的分给所有 parallel 线程,如: -- num(tablets) = 40, parallelism = 3,则这 3 个线程各自负责的 tablets 数量分别为 14,13,13 个。 -- num(tablets) = 2, parallelism = 3,则 Doris 会自动将 parallelism 设置为 2,每一个线程负责一个 tablets。 +```sql +-- parquet格式 +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "columns" = "k1,k2", + "format" = "parquet" +); -当一个线程负责的 tablets 超过 `maximum_tablets_of_outfile_in_export` 数值(默认为 10,可在 fe.conf 中添加`maximum_tablets_of_outfile_in_export`参数来修改该值)时,该线程就会拆分为多个`SELECT INTO OUTFILE`语句,如: -- 一个线程负责的 tablets 数量分别为 14,`maximum_tablets_of_outfile_in_export = 10`,则该线程负责两个`SELECT INTO OUTFILE`语句,第一个`SELECT INTO OUTFILE`语句导出 10 个 tablets,第二个`SELECT INTO OUTFILE`语句导出 4 个 tablets,两个`SELECT INTO OUTFILE`语句由该线程串行执行。 +-- orc格式 +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "columns" = "k1,k2", + "format" = "orc" +); +-- csv_with_names格式, 以‘AA’为列分割符,‘zz’为行分割符 +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "format" = "csv_with_names", + "column_separator"="AA", + "line_delimiter" = "zz" +); -当所要导出的数据量很大时,可以考虑适当调大`parallelism`参数来增加并发导出。若机器核数紧张,无法再增加`parallelism` 而导出表的 Tablets 又较多 时,可以考虑调大`maximum_tablets_of_outfile_in_export`来增加一个`SELECT INTO OUTFILE`语句负责的 tablets 数量,也可以加快导出速度。 +-- csv_with_names_and_types格式 +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "format" = "csv_with_names_and_types" +); +``` +> 注意: -### exec\_mem\_limit +> 导出到本地文件系统的功能不适用于公有云用户,仅适用于私有化部署的用户。并且默认用户对集群节点有完全的控制权限。Doris 对于用户填写的导出路径不会做合法性检查。如果 Doris 的进程用户对该路径无写权限,或路径不存在,则会报错。同时处于安全性考虑,如果该路径已存在同名的文件,则也会导出失败。 -通常一个 Export 作业的查询计划只有 `扫描-导出` 两部分,不涉及需要太多内存的计算逻辑。所以通常 2GB 的默认内存限制可以满足需求。 +> Doris 不会管理导出到本地的文件,也不会检查磁盘空间等。这些文件需要用户自行管理,如清理等。 -但在某些场景下,比如一个查询计划,在同一个 BE 上需要扫描的 Tablet 过多,或者 Tablet 的数据版本过多时,可能会导致内存不足。可以调整 session 变量`exec_mem_limit`来调大内存使用限制。 +### 指定分区导出 +导出作业支持仅导出 Doris 内表的部分分区,如仅导出 test 表的 p1 和 p2 分区 -## 注意事项 +```sql +EXPORT TABLE test +PARTITION (p1,p2) +TO "file:///home/user/tmp/" +PROPERTIES ( + "columns" = "k1,k2" +); +``` +### 导出时过滤数据 +导出作业支持导出时根据谓词条件过滤数据,仅导出符合条件的数据,如仅导出满足 `k1 < 50` 条件的数据 -* 不建议一次性导出大量数据。一个 Export 作业建议的导出数据量最大在几十 GB。过大的导出会导致更多的垃圾文件和更高的重试成本。 -* 如果表数据量过大,建议按照分区导出。 -* 在 Export 作业运行过程中,如果 FE 发生重启或切主,则 Export 作业会失败,需要用户重新提交。 -* 如果 Export 作业运行失败,已经生成的文件不会被删除,需要用户手动删除。 -* Export 作业可以导出 Base 表 / View 视图表 / 外表 的数据,不会导出 Rollup Index 的数据。 -* Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟。 -* 在使用 EXPORT 命令时,请确保目标路径是已存在的目录,否则导出可能会失败。 -* 在并发导出时,请注意合理地配置线程数量和并行度,以充分利用系统资源并避免性能瓶颈。 -* 导出到本地文件时,要注意文件权限和路径,确保有足够的权限进行写操作,并遵循适当的文件系统路径。 -* 在导出过程中,可以实时监控进度和性能指标,以便及时发现问题并进行优化调整。 -* 导出操作完成后,建议验证导出的数据是否完整和正确,以确保数据的质量和完整性。 +```sql +EXPORT TABLE test +WHERE k1 < 50 +TO "file:///home/user/tmp/" +PROPERTIES ( + "columns" = "k1,k2", + "column_separator"="," +); +``` +### 导出外表数据 +导出作业支持Doris Catalog外表数据: + +```sql +-- 创建一个catalog +CREATE CATALOG `tpch` PROPERTIES ( + "type" = "trino-connector", + "trino.connector.name" = "tpch", + "trino.tpch.column-naming" = "STANDARD", + "trino.tpch.splits-per-node" = "32" +); + +-- 导出 Catalog 外表数据 +EXPORT TABLE tpch.sf1.lineitem TO "file:///path/to/exp_" +PROPERTIES( + "parallelism" = "5", + "format" = "csv", + "max_file_size" = "1024MB" +); +``` -## 相关配置 +> 注意:当前Export导出 Catalog 外表数据不支持并发导出,即使指定 parallelism 大于 1,仍然是单线程导出。 -### FE +## 最佳实践 +### 导出一致性 +`Export`导出支持 partition / tablets 两种粒度。`data_consistency`参数用来指定以何种粒度切分希望导出的表,`none` 代表 Tablets 级别,`partition`代表 Partition 级别。 -* `maximum_tablets_of_outfile_in_export`:ExportExecutorTask 任务中一个 OutFile 语句允许的最大 tablets 数量。 +```sql +EXPORT TABLE test TO "file:///home/user/tmp" +PROPERTIES ( + "format" = "parquet", + "data_consistency" = "partition", + "max_file_size" = "512MB" +); +``` +若设置`"data_consistency" = "partition"` ,Export任务底层构造的多个`SELECT INTO OUTFILE` 语句都会导出不同的partition。 -## 更多帮助 +若设置`"data_consistency" = "none"` ,Export任务底层构造的多个`SELECT INTO OUTFILE` 语句都会导出不同的tablets,但是这些不同的tablets有可能属于相同的partition。 -关于 EXPORT 使用的更多详细语法及最佳实践,请参阅 [Export](../../sql-manual/sql-statements/Data-Manipulation-Statements/Manipulation/EXPORT) 命令手册,你也可以在 MySql 客户端命令行下输入 `HELP EXPORT` 获取更多帮助信息。 +关于Export底层构造 `SELECT INTO OUTFILE` 的逻辑,可参阅附录部分。 -EXPORT 命令底层实现是`SELECT INTO OUTFILE`语句,有关`SELECT INTO OUTFILE`可以参阅[同步导出](./outfile.md) 和 [SELECT INTO OUTFILE](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE.md)命令手册。 +### 导出作业并发度 +Export可以设置不同的并发度来并发导出数据。指定并发度为5: + +```sql +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "format" = "parquet", + "max_file_size" = "512MB", + "parallelism" = "5" +); +``` +关于Export并发导出的原理,可参阅附录部分。 + +### 导出前清空导出目录 +```sql +EXPORT TABLE test TO "file:///home/user/tmp" +PROPERTIES ( + "format" = "parquet", + "max_file_size" = "512MB", + "delete_existing_files" = "true" +); +``` +如果设置了 `"delete_existing_files" = "true"`,导出作业会先将`/home/user/`目录下所有文件及目录删除,然后导出数据到该目录下。 + +> 注意: + 若要使用delete\_existing\_files参数,还需要在fe.conf中添加配置`enable_delete_existing_files = true`并重启fe,此时delete\_existing\_files才会生效。delete\_existing\_files = true 是一个危险的操作,建议只在测试环境中使用。 + +### 设置导出文件的大小 +导出作业支持设置导出文件的大小,如果单个文件大小超过设定值,则会按照指定大小分成多个文件导出。 + +```sql +EXPORT TABLE test TO "file:///home/user/tmp/" +PROPERTIES ( + "format" = "parquet", + "max_file_size" = "512MB" +); +``` +通过设置 `"max_file_size" = "512MB"`,则单个导出文件的最大大小为 512MB。 + +## 注意事项 +1. 内存限制 + 通常一个 Export 作业的查询计划只有 `扫描-导出` 两部分,不涉及需要太多内存的计算逻辑。所以通常 2GB 的默认内存限制可以满足需求。 + 但在某些场景下,比如一个查询计划,在同一个 BE 上需要扫描的 Tablet 过多,或者 Tablet 的数据版本过多时,可能会导致内存不足。可以调整session变量`exec_mem_limit`来调大内存使用限制。 +<br/> +2. 导出数据量 + 不建议一次性导出大量数据。一个 Export 作业建议的导出数据量最大在几十 GB。过大的导出会导致更多的垃圾文件和更高的重试成本。如果表数据量过大,建议按照分区导出。 + 另外,Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟。 +<br/> +3. 导出文件的管理 + 如果 Export 作业运行失败,已经生成的文件不会被删除,需要用户手动删除。 +<br/> +4. 数据一致性 + 目前在export时只是简单检查tablets版本是否一致,建议在执行export过程中不要对该表进行导入数据操作。 +<br/> +5. 导出超时 + 若导出的数据量很大,超过导出的超时时间,则Export任务会失败。此时可以在Export命令中指定`timeout` 参数来增加超时时间并重试Export命令。 +<br/> +6. 导出失败 + 在 Export 作业运行过程中,如果 FE 发生重启或切主,则 Export 作业会失败,需要用户重新提交。可以通过`show export` 命令查看Export任务状态。 +<br/> +7. 导出分区数量 + 一个Export Job允许导出的分区数量最大为2000,可以在fe.conf中添加参数`maximum_number_of_export_partitions`并重启FE来修改该设置。 +<br/> +8. 并发导出 + 在并发导出时,请注意合理地配置线程数量和并行度,以充分利用系统资源并避免性能瓶颈。在导出过程中,可以实时监控进度和性能指标,以便及时发现问题并进行优化调整。 +<br/> +9. 数据完整性 + 导出操作完成后,建议验证导出的数据是否完整和正确,以确保数据的质量和完整性。 + +## 附录 +### 并发导出原理 + +Export 任务的底层是执行`SELECT INTO OUTFILE` SQL语句。用户发起一个 Export 任务后,Doris会根据 Export 要导出的表构造出一个或多个 `SELECT INTO OUTFILE` 执行计划,随后将这些`SELECT INTO OUTFILE` 执行计划提交给 Doris的 Job Schedule 任务调度器,Job Schedule 任务调度器会自动调度这些任务并执行。 + +默认情况下,Export 任务是单线程执行的。为了提高导出的效率,Export 命令可以设置一个 `parallelism` 参数来并发导出数据。设置`parallelism` 大于1后,Export 任务会使用多个线程并发的去执行 `SELECT INTO OUTFILE` 查询计划。`parallelism`参数实际就是指定执行 EXPORT 作业的线程数量。 + +一个 Export 任务构造一个或多个 `SELECT INTO OUTFILE` 执行计划的具体逻辑是: + +1. 选择导出的数据的一致性模型 + 根据 `data_consistency` 参数来决定导出的一致性,这个只和语义有关,和并发度无关,用户要先根据自己的需求,选择一致性模型。 +<br/> +2. 确定并发度 + 根据 `parallelism` 参数确定由多少个线程来运行这些 `SELECT INTO OUTFILE` 执行计划。parallelism 决定了最大可能的线程数。 + + > 注意:即使 Export 命令设置了 `parallelism` 参数,该 Export 任务的实际并发线程数量还与Job Schedule有关。Export 任务设置多并发后,每一个并发线程都是 Job Schedule 提供的,所以如果此时 Doris 系统任务较繁忙,Job Schedule 的线程资源较紧张,那么有可能分给 Export 任务的实际线程数量达不到 `parallelism` 个数,影响 Export 的并发导出。此时可以通过减轻系统负载或调整 FE 配置 `async_task_consumer_thread_num` 增加 Job Schedule 的总线程数量来缓解这个问题。 +<br/> +3. 确定每一个 outfile 语句的任务量 + 每一个线程会根据 `maximum_tablets_of_outfile_in_export` 以及数据实际的分区数 / buckets 数来决定要拆分成多少个 outfile。 + > `maximum_tablets_of_outfile_in_export` 是 FE 的配置,默认值为10。该参数用于指定Export 任务切分出来的单个 OutFile 语句中允许的最大 partitions / buckets 数量。修改该配置需要重启FE。 + +举例:假设一张表共有20个 partition,每个 partition 都有5个buckets,那么该表一共有100个buckets。设置`data_consistency = none` 以及 `maximum_tablets_of_outfile_in_export = 10`。 + +1. `parallelism = 5` 情况下:Export 任务将把该表的 100 个buckets分成5份,每个线程负责 20 个buckets。每个线程负责的 20 个buckets又将以 10 个为单位分成 2 组,每组 buckets 各由一个 outfile 查询计划负责。所以最终该 Export 任务有 5 个线程并发执行,每个线程负责 2 个 outfile 语句,每个线程负责的 outfile 语句串行的被执行。 + +2. `parallelism = 3` 情况下:Export 任务将把该表的 100 个buckets分成 3 份,3 个线程分别负责 34、33、33 个buckets。每个线程负责的 buckets 又将以 10 个为单位分成 4 组(最后一组不足10个buckets),每组 buckets 各由一个 outfile 查询计划负责。所以该 Export 任务最终有 3 个线程并发执行,每个线程负责 4 个 outfile 语句,每个线程负责的 outfile 语句串行的被执行。 + +3. `parallelism = 120` 情况下:由于该表 buckets 只有100个,所以系统会将 `parallelism` 强制设为 100 ,并以 `parallelism = 100` 去执行。Export 任务将把该表的 100 个 buckets 分成 100 份,每个线程负责 1 个buckets。每个线程负责的 1 个buckets 又将以 10 个为单位分成 1 组(该组实际就只有 1 个 buckets),每组buckets 由一个 outfile 查询计划负责。所以最终该 Export 任务有 100 个线程并发执行,每个线程负责 1 个 outfile 语句,每个 outfile 语句实际只导出 1 个 buckets。 + +当前版本若希望 Export 有一个较好的性能,建议设置以下参数: +1. 打开 session 变量 `enable_parallel_outfile`。 +2. 设置 Export 的 `parallelism` 参数为较大值,使得每一个线程只负责一个 `SELECT INTO OUTFILE` 查询计划。 +3. 设置 FE 配置 `maximum_tablets_of_outfile_in_export` 为较小值,使得每一个 `SELECT INTO OUTFILE` 查询计划导出的数据量较小。 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/export-overview.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/export-overview.md new file mode 100644 index 00000000000..9af28a2e04e --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/export-overview.md @@ -0,0 +1,131 @@ +--- +{ + "title": "数据导出概述", + "language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## 数据导出概述 +数据导出功能,用于将查询结果集或者Doris的表数据,使用指定的文件格式,写入指定的存储系统中的。 + +导出功能和数据备份功能有以下区别: + +| |数据导出|数据备份| +| ----- | ----- | ----- | +|数据最终存储位置|HDFS、对象存储、本地文件系统|HDFS、对象存储| +|数据格式|Parquet、ORC、CSV 等开放格式|Doris 内部存储格式| +|执行速度|中等(需要读取数据并转换成目标数据格式)|快速(无需解析和转换,直接上传Doris数据文件)| +|灵活度|可以通过 SQL 语句灵活定义要导出的数据|仅支持表级别全量备份| +|使用场景|结果集下载、不同系统之间的数据交换|数据备份、Doris集群间的数据迁移| + +## 选择导出方式 +Doris 提供以下三种不同的数据导出方式: + +* SELECT INTO OUTFILE:支持任意 SQL 结果集的导出。 +* EXPORT:支持表级别的部分或全部数据导出。 +* MySQL DUMP:兼容 mysql dump 指令的数据导出。 + + + +三种导出方式的异同点如下: + +| |SELECT INTO OUTFILE|EXPORT|MySQL DUMP| +| ----- | ----- | ----- | ----- | +|同步/异步|同步|异步(提交EXPORT任务后通过 SHOW EXPORT 命令查看任务进度)|同步| +|支持任意 SQL|支持|不支持|不支持| +|导出指定分区|支持|支持|不支持| +|导出指定tablets|支持|不支持|不支持| +|并发导出|支持且并发高(但取决于 SQL 语句是否有 ORDER BY 等需要单机处理的算子)|支持且并发高(支持 Tablet 粒度的并发导出)|不支持,只能单线程导出| +|支持导出的数据格式|Parquet、ORC、CSV|Parquet、ORC、CSV|MySQL Dump 专有格式| +|是否支持导出外表|支持|部分支持|不支持| +|是否支持导出view|支持|支持|支持| +|支持的导出位置|S3、HDFS、LOCAL|S3、HDFS、LOCAL|LOCAL| + +### SELECT INTO OUTFILE +适用于以下场景: + +* 导出数据需要经过复杂计算逻辑的,如过滤、聚合、关联等。 +* 适合执行同步任务的场景。 + +### EXPORT +适用于以下场景: + +* 大数据量的单表导出、仅需简单的过滤条件。 +* 需要异步提交任务的场景。 + +### MySQl Dump +适用于以下场景: + +* 兼容MySQL 生态,需要同时导出表结构和数据。 +* 仅用于开发测试或者数据量很小的情况。 + + + +## 导出文件列类型映射 +Parquet、ORC 文件格式拥有自己的数据类型。Doris 的导出功能能够自动将 Doris 的数据类型导出为 Parquet、ORC 文件格式的对应数据类型。CSV 格式没有类型,所有数据都以文本形式输出。 + + + +以下是Doris数据类型和 Parquet、ORC 文件格式的数据类型映射关系表: +1. Doris导出到Orc文件格式的数据类型映射表: + |Doris Type|Orc Type| + | ----- | ----- | + |boolean|boolean| + |tinyint|tinyint| + |smallint|smallint| + |int|int| + |bigint|bigint| + |largeInt|string| + |date|string| + |datev2|string| + |datetime|string| + |datetimev2|timestamp| + |float|float| + |double|double| + |char / varchar / string|string| + |decimal|decimal| + |struct|struct| + |map|map| + |array|array| + + <br/> +2. Doris导出到Parquet文件格式时,会先将Doris内存数据转换为arrow内存数据格式,然后由arrow写出到parquet文件格式。Doris数据类型到arrow数据类的映射关系为: + + |Doris Type|Arrow Type| + | ----- | ----- | + |boolean|boolean| + |tinyint|int8| + |smallint|int16| + |int|int32| + |bigint|int64| + |largeInt|utf8| + |date|utf8| + |datev2|Date32Type| + |datetime|utf8| + |datetimev2|TimestampType| + |float|float32| + |double|float64| + |char / varchar / string|utf8| + |decimal|decimal128| + |struct|struct| + |map|map| + |array|list| diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/outfile.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/outfile.md index 1d5aba59c58..5e205e1b0a3 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/outfile.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/export/outfile.md @@ -24,151 +24,313 @@ specific language governing permissions and limitations under the License. --> +## SELECT INTO OUTFILE +本文档将介绍如何使用 `SELECT INTO OUTFILE` 命令进行查询结果的导出操作。 -本文档介绍如何使用 [SELECT INTO OUTFILE](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE) 命令进行查询结果的导出操作。 +有关`SELECT INTO OUTFILE`命令的详细介绍,请参考:[SELECT INTO OUTFILE](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE.md) -`SELECT INTO OUTFILE` 是一个同步命令,命令返回即表示操作结束,同时会返回一行结果来展示导出的执行结果。 +## 概述 +`SELECT INTO OUTFILE` 命令将 `SELECT` 部分的结果数据,以指定的文件格式导出到目标存储系统中,包括对象存储、HDFS 或本地文件系统。 -## 示例 +`SELECT INTO OUTFILE` 是一个同步命令,命令返回即表示导出结束。若导出成功,会返回导出的文件数量、大小、路径等信息。若导出失败,会返回错误信息。 + +关于如何选择 `SELECT INTO OUTFILE` 和 `EXPORT`,请参阅 [导出综述](./export-overview.md)。 + + + +`SELECT INTO OUTFILE` 目前支持以下导出格式 + +* Parquet +* ORC +* csv +* csv\_with\_names +* csv\_with\_names\_and\_types +不支持压缩格式的导出。 + + + +示例: + +```sql +mysql> SELECT * FROM tbl1 LIMIT 10 INTO OUTFILE "file:///home/work/path/result_"; ++------------+-----------+----------+--------------------------------------------------------------------+ +| FileNumber | TotalRows | FileSize | URL | ++------------+-----------+----------+--------------------------------------------------------------------+ +| 1 | 2 | 8 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ | ++------------+-----------+----------+--------------------------------------------------------------------+ +``` +返回结果说明: + +* FileNumber:最终生成的文件个数。 +* TotalRows:结果集行数。 +* FileSize:导出文件总大小。单位字节。 +* URL:导出的文件路径的前缀,多个文件会以后缀 `_0`,`_1` 依次编号。 + +## 导出文件列类型映射 +`SELECT INTO OUTFILE` 支持导出为Parquet、ORC 文件格式。Parquet、ORC 文件格式拥有自己的数据类型,Doris 的导出功能能够自动将 Doris 的数据类型导出为 Parquet、ORC 文件格式的对应数据类型,具体映射关系请参阅[导出综述](./export-view.md)文档的 "导出文件列类型映射" 部分。 + +## 示例 ### 导出到 HDFS +将查询结果导出到文件 `hdfs://path/to/` 目录下,指定导出格式为 PARQUET: + +```sql +SELECT c1, c2, c3 FROM tbl +INTO OUTFILE "hdfs://${host}:${fileSystem_port}/path/to/result_" +FORMAT AS PARQUET +PROPERTIES +( + "fs.defaultFS" = "hdfs://ip:port", + "hadoop.username" = "hadoop" +); +``` +如果HDFS开启了高可用,则需要提供HA信息,如: -将简单查询结果导出到文件 `hdfs://path/to/result.txt`,指定导出格式为 CSV。 +```sql +SELECT c1, c2, c3 FROM tbl +INTO OUTFILE "hdfs://HDFS8000871/path/to/result_" +FORMAT AS PARQUET +PROPERTIES +( + "fs.defaultFS" = "hdfs://HDFS8000871", + "hadoop.username" = "hadoop", + "dfs.nameservices" = "your-nameservices", + "dfs.ha.namenodes.your-nameservices" = "nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1" = "ip:port", + "dfs.namenode.rpc-address.HDFS8000871.nn2" = "ip:port", + "dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" +); +``` +如果Hadoop 集群开启了高可用并且启用了 Kerberos 认证,可以参考如下SQL语句: ```sql SELECT * FROM tbl INTO OUTFILE "hdfs://path/to/result_" -FORMAT AS CSV +FORMAT AS PARQUET PROPERTIES ( - "broker.name" = "my_broker", - "column_separator" = ",", - "line_delimiter" = "\n" + "fs.defaultFS"="hdfs://hacluster/", + "hadoop.username" = "hadoop", + "dfs.nameservices"="hacluster", + "dfs.ha.namenodes.hacluster"="n1,n2", + "dfs.namenode.rpc-address.hacluster.n1"="192.168.0.1:8020", + "dfs.namenode.rpc-address.hacluster.n2"="192.168.0.2:8020", + "dfs.client.failover.proxy.provider.hacluster"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "dfs.namenode.kerberos.principal"="hadoop/_h...@realm.com" + "hadoop.security.authentication"="kerberos", + "hadoop.kerberos.principal"="doris_t...@realm.com", + "hadoop.kerberos.keytab"="/path/to/doris_test.keytab" ); ``` +### 导出到 S3 +将查询结果导出到s3存储的 `s3://path/to/` 目录下,指定导出格式为 ORC,需要提供`sk` `ak`等信息 -### 导出到本地文件 +```sql +SELECT * FROM tbl +INTO OUTFILE "s3://path/to/result_" +FORMAT AS ORC +PROPERTIES( + "s3.endpoint" = "https://xxx", + "s3.region" = "ap-beijing", + "s3.access_key"= "your-ak", + "s3.secret_key" = "your-sk" +); +``` +### 导出到本地 +> 如需导出到本地文件,需在 `fe.conf` 中添加 `enable_outfile_to_local=true`并重启 FE。 -导出到本地文件时需要先在 fe.conf 中配置`enable_outfile_to_local=true` +将查询结果导出到BE的`file:///path/to/` 目录下,指定导出格式为 CSV,指定列分割符为`,`。 ```sql -select * from tbl1 limit 10 -INTO OUTFILE "file:///home/work/path/result_"; +SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1 +INTO OUTFILE "file:///path/to/result_" +FORMAT AS CSV +PROPERTIES( + "column_separator" = "," +); ``` +> 注意: + 导出到本地文件的功能不适用于公有云用户,仅适用于私有化部署的用户。并且默认用户对集群节点有完全的控制权限。Doris 对于用户填写的导出路径不会做合法性检查。如果 Doris 的进程用户对该路径无写权限,或路径不存在,则会报错。同时处于安全性考虑,如果该路径已存在同名的文件,则也会导出失败。 + Doris 不会管理导出到本地的文件,也不会检查磁盘空间等。这些文件需要用户自行管理,如清理等。 -更多用法可查看[OUTFILE 文档](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE)。 +## 最佳实践 +### 生成导出成功标识文件 +`SELECT INTO OUTFILE`命令是一个同步命令,因此有可能在SQL执行过程中任务连接断开了,从而无法获悉导出的数据是否正常结束或是否完整。此时可以使用 `success_file_name` 参数要求导出成功后,在目录下生成一个文件标识。 -## 并发导出 +类似 Hive,用户可以通过判断导出目录中是否有`success_file_name` 参数指定的文件,来判断导出是否正常结束以及导出目录中的文件是否完整。 -默认情况下,查询结果集的导出是非并发的,也就是单点导出。如果用户希望查询结果集可以并发导出,需要满足以下条件: -1. Session variable 'enable_parallel_outfile' 开启并发导出: ```set enable_parallel_outfile = true;``` -2. 导出方式为 S3 , 或者 HDFS,而不是使用 broker +例如:将 select 语句的查询结果导出到腾讯云 COS:`s3://${bucket_name}/path/my_file_`。指定导出格式为 csv。 指定导出成功标识文件名为`SUCCESS`。导出完成后,生成一个标识文件。 -3. 查询可以满足并发导出的需求,比如顶层不包含 sort 等单点节点。(后面会举例说明,哪种属于不可并发导出结果集的查询) +```sql +SELECT k1,k2,v1 FROM tbl1 LIMIT 100000 +INTO OUTFILE "s3://my_bucket/path/my_file_" +FORMAT AS CSV +PROPERTIES +( + "s3.endpoint" = "${endpoint}", + "s3.region" = "ap-beijing", + "s3.access_key"= "ak", + "s3.secret_key" = "sk", + "column_separator" = ",", + "line_delimiter" = "\n", + "success_file_name" = "SUCCESS" +) +``` +在导出完成后,会多写出一个文件,该文件的文件名为 `SUCCESS`。 -满足以上三个条件,就能触发并发导出查询结果集了。并发度 = ```be_instacne_num * parallel_fragment_exec_instance_num``` +### 并发导出 +默认情况下,`SELECT` 部分的查询结果会先汇聚到某一个 BE 节点,由该节点单线程导出数据。然而,在某些情况下,如没有 `ORDER BY` 子句的查询语句,则可以开启并发导出,多个 BE 节点同时导出数据,以提升导出性能。 -### 如何验证结果集被并发导出 +下面我们通过一个示例演示如何正确开启并发导出功能: -用户通过 Session 变量设置开启并发导出后,如果想验证当前查询是否能进行并发导出,则可以通过下面这个方法。 +1. 打开并发导出会话变量 ```sql -explain select xxx from xxx where xxx into outfile "s3://xxx" format as csv properties ("AWS_ENDPOINT" = "xxx", ...); +mysql> SET enable_parallel_outfile = true; ``` +2. 执行导出命令 -对查询进行 explain 后,Doris 会返回该查询的规划,如果你发现 ```RESULT FILE SINK``` 出现在 ```PLAN FRAGMENT 1``` 中,就说明导出并发开启成功了。 - -如果 ```RESULT FILE SINK``` 出现在 ```PLAN FRAGMENT 0``` 中,则说明当前查询不能进行并发导出 (当前查询不同时满足并发导出的三个条件)。 - -```Plain -并发导出的规划示例: -+-----------------------------------------------------------------------------+ -| Explain String | -+-----------------------------------------------------------------------------+ -| PLAN FRAGMENT 0 | -| OUTPUT EXPRS:<slot 2> | <slot 3> | <slot 4> | <slot 5> | -| PARTITION: UNPARTITIONED | -| | -| RESULT SINK | -| | -| 1:EXCHANGE | -| | -| PLAN FRAGMENT 1 | -| OUTPUT EXPRS:`k1` + `k2` | -| PARTITION: HASH_PARTITIONED: `default_cluster:test`.`multi_tablet`.`k1` | -| | -| RESULT FILE SINK | -| FILE PATH: s3://ml-bd-repo/bpit_test/outfile_1951_ | -| STORAGE TYPE: S3 | -| | -| 0:OlapScanNode | -| TABLE: multi_tablet | -+-----------------------------------------------------------------------------+ +```sql +mysql> SELECT * FROM demo.tbl + -> INTO OUTFILE "file:///path/to/ftw/export/exp_" + -> FORMAT AS PARQUET; ++------------+-----------+----------+-------------------------------------------------------------------------------+ +| FileNumber | TotalRows | FileSize | URL | ++------------+-----------+----------+-------------------------------------------------------------------------------+ +| 1 | 104494 | 7998308 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d3_ | +| 1 | 104984 | 8052491 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d5_ | +| 1 | 104345 | 7981406 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d1_ | +| 1 | 104034 | 7977301 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d4_ | +| 1 | 104238 | 7979757 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d2_ | +| 1 | 159450 | 11870222 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7d0_ | +| 1 | 209691 | 16082100 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7ce_ | +| 1 | 208769 | 16004096 | file:///127.0.0.1/path/to/exp_1f850179e684476b-9bf001a6bf96d7cf_ | ++------------+-----------+----------+-------------------------------------------------------------------------------+ ``` +可以看到,开启并成功触发并发导出功能后,返回的结果可能是多行,表示有多个线程并发导出。 -## 返回结果 +如果我们修改上述语句,即在查询语句中加入 `ORDER BY` 子句。由于查询语句带了一个顶层的排序节点,所以这个查询即使开启并发导出功能,也是无法并发导出的: -导出命令为同步命令。命令返回,即表示操作结束。同时会返回一行结果来展示导出的执行结果。 +```sql +mysql> SELECT * FROM demo.tbl ORDER BY id + -> INTO OUTFILE "file:///path/to/ftw/export/exp_" + -> FORMAT AS PARQUET; ++------------+-----------+----------+-------------------------------------------------------------------------------+ +| FileNumber | TotalRows | FileSize | URL | ++------------+-----------+----------+-------------------------------------------------------------------------------+ +| 1 | 1100005 | 80664607 | file:///127.0.0.1/mnt/disk2/ftw/export/exp_20c5461055774128-826256c0cfb3d8fc_ | ++------------+-----------+----------+-------------------------------------------------------------------------------+ +``` +可以看到,最终结果只有一行,并没有触发并发导出。 -如果正常导出并返回,则结果如下: +关于更多并发导出的原理说明,可参阅附录部分。 +### 导出前清空导出目录 ```sql -mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_"; -+------------+-----------+----------+--------------------------------------------------------------------+ -| FileNumber | TotalRows | FileSize | URL | -+------------+-----------+----------+--------------------------------------------------------------------+ -| 1 | 2 | 8 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ | -+------------+-----------+----------+--------------------------------------------------------------------+ -1 row in set (0.05 sec) +SELECT * FROM tbl1 +INTO OUTFILE "s3://my_bucket/export/my_file_" +FORMAT AS CSV +PROPERTIES +( + "s3.endpoint" = "${endpoint}", + "s3.region" = "region", + "s3.access_key"= "ak", + "s3.secret_key" = "sk", + "column_separator" = ",", + "line_delimiter" = "\n", + "delete_existing_files" = "true" +) ``` +如果设置了 `"delete_existing_files" = "true"`,导出作业会先将 `s3://my_bucket/export/`目录下所有文件及目录删除,然后导出数据到该目录下。 -* FileNumber:最终生成的文件个数。 +> 注意: -* TotalRows:结果集行数。 +> 若要使用delete\_existing\_files参数,还需要在fe.conf中添加配置`enable_delete_existing_files = true`并重启fe,此时delete\_existing\_files才会生效。delete\_existing\_files = true 是一个危险的操作,建议只在测试环境中使用。 -* FileSize:导出文件总大小。单位字节。 +### 设置导出文件的大小 +```sql +SELECT * FROM tbl +INTO OUTFILE "s3://path/to/result_" +FORMAT AS ORC +PROPERTIES( + "s3.endpoint" = "https://xxx", + "s3.region" = "ap-beijing", + "s3.access_key"= "your-ak", + "s3.secret_key" = "your-sk", + "max_file_size" = "2048MB" +); +``` +由于指定了 `"max_file_size" = "2048MB"` 最终生成文件如如果不大于 2GB,则只有一个文件。 如果大于 2GB,则有多个文件。 -* URL:如果是导出到本地磁盘,则这里显示具体导出到哪个 Compute Node。 +## 注意事项 +1. 导出数据量和导出效率 -如果进行了并发导出,则会返回多行数据。 + `SELECT INTO OUTFILE`功能本质上是执行一个 SQL 查询命令。如果不开启并发导出,查询结果是由单个 BE 节点,单线程导出的,因此整个导出的耗时包括查询本身的耗时和最终结果集写出的耗时。开启并发导出可以降低导出的时间。 -```sql -+------------+-----------+----------+--------------------------------------------------------------------+ -| FileNumber | TotalRows | FileSize | URL | -+------------+-----------+----------+--------------------------------------------------------------------+ -| 1 | 3 | 7 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ | -| 1 | 2 | 4 | file:///192.168.1.11/home/work/path/result_{fragment_instance_id}_ | -+------------+-----------+----------+--------------------------------------------------------------------+ -2 rows in set (2.218 sec) -``` +2. 导出超时 -如果执行错误,则会返回错误信息,如: + 导出命令的超时时间与查询的超时时间相同,如果数据量较大导致导出数据超时,可以设置会话变量 `query_timeout` 适当的延长查询超时时间。 -```sql -mysql> SELECT * FROM tbl INTO OUTFILE ... -ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... -``` +3. 导出文件的管理 -## 注意事项 + Doris 不会管理导出的文件,无论是导出成功的还是导出失败后残留的文件,都需要用户自行处理。 + + 另外,`SELECT INTO OUTFILE` 命令不会检查文件及文件路径是否存在。`SELECT INTO OUTFILE` 是否会自动创建路径、或是否会覆盖已存在文件,完全由远端存储系统的语义决定。 + +4. 如果查询的结果集为空 + + 对于结果集为空的导出,依然会产生一个空文件。 + +5. 文件切分 -* 如果不开启并发导出,查询结果是由单个 BE 节点,单线程导出的。因此导出时间和导出结果集大小正相关。开启并发导出可以降低导出的时间。 + 文件切分会保证一行数据完整的存储在单一文件中。因此文件的大小并不严格等于 `max_file_size`。 -* 导出命令不会检查文件及文件路径是否存在。是否会自动创建路径、或是否会覆盖已存在文件,完全由远端存储系统的语义决定。 +6. 非可见字符的函数 -* 如果在导出过程中出现错误,可能会有导出文件残留在远端存储系统上。Doris 不会清理这些文件。需要用户手动清理。 + 对于部分输出为非可见字符的函数,如 BITMAP、HLL 类型,CSV 输出为 `\N`,Parquet、ORC 输出为 NULL。 -* 导出命令的超时时间同查询的超时时间。可以通过 `SET query_timeout=xxx` 进行设置。 + 目前部分地理信息函数,如 `ST_Point` 的输出类型为 VARCHAR,但实际输出值为经过编码的二进制字符。当前这些函数会输出乱码。对于地理函数,请使用 `ST_AsText` 进行输出。 -* 对于结果集为空的查询,依然会产生一个文件。 +## 附录 +### 并发导出原理 +1. 原理介绍 -* 文件切分会保证一行数据完整的存储在单一文件中。因此文件的大小并不严格等于 `max_file_size`。 + Doris是典型的基于 MPP 架构的高性能、实时的分析型数据库。MPP架构的一大特征是使用分布式架构,将大规模数据集划分为小块,并在多个节点上并行处理。 -* 对于部分输出为非可见字符的函数,如 BITMAP、HLL 类型,输出为 `\N`,即 NULL。 + `SELECT INTO OUTFILE`的并发导出就是基于上述MPP架构的并行处理能力,在可以并发导出的场景下(后面会详细说明哪些场景可以并发导出),并行的在多个BE节点上导出,每个BE处理结果集的一部分。 -* 目前部分地理信息函数,如 `ST_Point` 的输出类型为 VARCHAR,但实际输出值为经过编码的二进制字符。当前这些函数会输出乱码。对于地理函数,请使用 `ST_AsText` 进行输出。 +2. 如何判断可以执行并发导出 + - 确定会话变量已开启:`set enable_parallel_outfile = true;` + - 通过 `EXPLAIN` 查看执行计划 -## 更多帮助 + ```sql + mysql> EXPLAIN SELECT ... INTO OUTFILE "s3://xxx" ...; + +-----------------------------------------------------------------------------+ + | Explain String | + +-----------------------------------------------------------------------------+ + | PLAN FRAGMENT 0 | + | OUTPUT EXPRS:<slot 2> | <slot 3> | <slot 4> | <slot 5> | + | PARTITION: UNPARTITIONED | + | | + | RESULT SINK | + | | + | 1:EXCHANGE | + | | + | PLAN FRAGMENT 1 | + | OUTPUT EXPRS:`k1` + `k2` | + | PARTITION: HASH_PARTITIONED: `default_cluster:test`.`multi_tablet`.`k1` | + | | + | RESULT FILE SINK | + | FILE PATH: s3://ml-bd-repo/bpit_test/outfile_1951_ | + | STORAGE TYPE: S3 | + | | + | 0:OlapScanNode | + | TABLE: multi_tablet | + +-----------------------------------------------------------------------------+ + ``` + `EXPLAIN` 命令会返回该语句的查询计划.观察该查询计划,如果发现 `RESULT FILE SINK` 出现在 `PLAN FRAGMENT 1` 中,就说明该查询语句可以并发导出。如果 `RESULT FILE SINK` 出现在 `PLAN FRAGMENT 0` 中,则说明当前查询不能进行并发导出。 -关于 OUTFILE 使用的更多详细语法及最佳实践,请参阅 [OUTFILE](../../sql-manual/sql-statements/Data-Manipulation-Statements/OUTFILE) 命令手册,你也可以在 MySQL 客户端命令行下输入 `HELP OUTFILE` 获取更多帮助信息。 +3. 导出并发度 + 当满足并发导出的条件后,导出任务的并发度为:`BE 节点数 * parallel_fragment_exec_instance_num`。 \ No newline at end of file diff --git a/sidebars.json b/sidebars.json index e45e60f514a..434154e4684 100644 --- a/sidebars.json +++ b/sidebars.json @@ -142,6 +142,7 @@ "type": "category", "label": "Exporting Data", "items": [ + "data-operate/export/export-overview", "data-operate/export/export-manual", "data-operate/export/outfile", "data-operate/export/export-with-mysql-dump" @@ -1538,4 +1539,4 @@ ] } ] -} +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org