This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push: new dbd1ccceb5 [doc](load) optimize group commit (#1778) dbd1ccceb5 is described below commit dbd1ccceb5668198570030d8623ed6fa0900f3ea Author: Xin Liao <liao...@selectdb.com> AuthorDate: Tue Jan 14 09:41:16 2025 +0800 [doc](load) optimize group commit (#1778) --- docs/data-operate/import/group-commit-manual.md | 400 ++++++++++---------- .../data-operate/import/group-commit-manual.md | 401 ++++++++++---------- .../data-operate/import/group-commit-manual.md | 346 +++++++++--------- .../data-operate/import/group-commit-manual.md | 348 +++++++++--------- .../data-operate/import/group-commit-manual.md | 401 ++++++++++---------- .../data-operate/import/group-commit-manual.md | 402 +++++++++++---------- 6 files changed, 1195 insertions(+), 1103 deletions(-) diff --git a/docs/data-operate/import/group-commit-manual.md b/docs/data-operate/import/group-commit-manual.md index 144a18ce39..0afd83e1f9 100644 --- a/docs/data-operate/import/group-commit-manual.md +++ b/docs/data-operate/import/group-commit-manual.md @@ -24,83 +24,39 @@ specific language governing permissions and limitations under the License. --> -# Group Commit +In high-frequency small batch write scenarios, traditional loading methods have the following issues: -Group commit load does not introduce a new data import method, but an extension of `INSERT INTO tbl VALUS(...)` and `Stream Load`. It is a way to improve the write performance of Doris with high-concurrency and small-data writes. Your application can directly use JDBC to do high-concurrency insert operation into Doris, at the same time, combining PreparedStatement can get even higher performance. In logging scenarios, you can also do high-concurrency Stream Load into Doris. +- Each load creates an independent transaction, requiring FE to parse SQL and generate execution plans, affecting overall performance +- Each load generates a new version, causing rapid version growth and increasing background compaction pressure -## Group Commit Mode +To solve these problems, Doris introduced the Group Commit mechanism. Group Commit is not a new loading method, but rather an optimization extension of existing loading methods, mainly targeting: -Group Commit provides 3 modes: +- `INSERT INTO tbl VALUES(...)` statements +- Stream Load -* `off_mode` +By merging multiple small batch loads into one large transaction commit in the background, it significantly improves high-concurrency small batch write performance. Additionally, using Group Commit with PreparedStatement can achieve even higher performance improvements. -Disable group commit. +## Group Commit Modes -* `sync_mode` +Group Commit has three modes: -Doris groups multiple loads into one transaction commit based on the `group_commit_interval` table property. The load is returned after the transaction commit. This mode is suitable for high-concurrency writing scenarios and requires immediate data visibility after the load is finished. +* Off Mode (`off_mode`) -* `async_mode` + Group Commit is disabled. -Doris writes data to the Write Ahead Log (WAL) firstly, then the load is returned. Doris groups multiple loads into one transaction commit based on the `group_commit_interval` table property, and the data is visible after the commit. To prevent excessive disk space usage by the WAL, it automatically switches to `sync_mode`. This is suitable for latency-sensitive and high-frequency writing. +* Synchronous Mode (`sync_mode`) -The number of WALs can be viewed through the FE HTTP interface, as detailed [here](../../admin-manual/open-api/fe-http/get-wal-size-action). Alternatively, you can search for the keyword `wal` in the BE metrics. + Doris commits multiple loads in one transaction based on load and table's `group_commit_interval` property, returning after transaction commit. This is suitable for high-concurrency write scenarios requiring immediate data visibility after loading. -## Limitations +* Asynchronous Mode (`async_mode`) -* When the group commit is enabled, some `INSERT INTO VALUES` sqls are not executed in the group commit way if they meet the following conditions: + Doris first writes data to WAL (Write Ahead Log), then returns immediately. Doris asynchronously commits data based on load and table's `group_commit_interval` property, making data visible after commit. To prevent WAL from occupying too much disk space, it automatically switches to `sync_mode` for large single loads. This is suitable for write latency-sensitive and high-frequency write scenarios. - * Transaction insert, such as `BEGIN`, `INSERT INTO VALUES`, `COMMIT` + WAL count can be viewed through FE http interface as shown [here](../../admin-manual/open-api/fe-http/get-wal-size-action), or by searching for `wal` keyword in BE metrics. - * Specify the label, such as `INSERT INTO dt WITH LABEL {label} VALUES` +## How to Use Group Commit - * Expressions within VALUES, such as `INSERT INTO dt VALUES (1 + 100)` - - * Column update - - * Tables that do not support light schema changes - -* When the group commit is enabled, some `Stream Load` and `Http Stream` are not executed in the group commit way if they meet the following conditions: - - * Two phase commit - - * Specify the label by set header `-H "label:my_label"` - - * Column update - - * Tables that do not support light schema changes - -* For unique table, because the group commit can not guarantee the commit order, users can use sequence column to ensure the data consistency. - -* The limit of `max_filter_ratio` - - * For non group commit load, filter_ratio is calculated by the failed rows and total rows when load is finished. If the filter_ratio does not match, the transaction will not commit - - * In the group commit mode, multiple user loads are executed by one internal load. The internal load will commit all user loads. - - * Currently, group commit supports a certain degree of max_filter_ratio semantics. When the total number of rows does not exceed group_commit_memory_rows_for_max_filter_ratio (configured in `be.conf`, defaulting to `10000` rows), max_filter_ratio will work. - -* The limit of WAL - - * For async_mode group commit, data is written to the Write Ahead Log (WAL). If the internal load succeeds, the WAL is immediately deleted. If the internal load fails, data is recovered by loading the WAL. - - * Currently, WAL files are stored only on one disk of one BE. If the BE's disk is damaged or the file is mistakenly deleted, it may result in data loss. - - * When decommissioning a BE node, please use the [`DECOMMISSION`](../../../sql-manual/sql-statements/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND) command to safely decommission the node. This prevents potential data loss if the WAL files are not processed before the node is taken offline. - - * For async_mode group commit writes, to protect disk space, it switches to sync_mode under the following conditions: - - * For a load with large amount of data: exceeding 80% of the disk space of a WAL directory. - - * Chunked stream loads with an unknown data amount. - - * Insufficient disk space, even if it is a load with small amount of data. - - * During hard weight schema changes (adding or dropping columns, modifying varchar length, and renaming columns are lightweight schema changes, others are hard weight), to ensure WAL file is compatibility with the table's schema, the final stage of metadata modification in FE will reject group commit writes. Clients get `insert table ${table_name} is blocked on schema change` exception and can retry the load. - -## Basic operations - -If the table schema is: +Assuming the table structure is: ```sql CREATE TABLE `dt` ( `id` int(11) NOT NULL, @@ -114,25 +70,25 @@ PROPERTIES ( ); ``` -### Use `JDBC` +### Using JDBC -To reduce the CPU cost of SQL parsing and query planning, we provide the `PreparedStatement` in the FE. When using `PreparedStatement`, the SQL and its plan will be cached in the session level memory cache and will be reused later on, which reduces the CPU cost of FE. The following is an example of using PreparedStatement in JDBC: +When users write using JDBC's `insert into values` method, to reduce SQL parsing and planning overhead, we support MySQL protocol's `PreparedStatement` feature on the FE side. When using `PreparedStatement`, SQL and its load plan are cached in session-level memory cache, and subsequent loads directly use the cached objects, reducing FE CPU pressure. Here's an example of using `PreparedStatement` in JDBC: -1. Setup JDBC url and enable server side prepared statement +**1. Set JDBC URL and enable Prepared Statement on Server side** ``` url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500 ``` -2. Set `group_commit` session variable, there are two ways to do it: +**2. Configure `group_commit` session variable in one of two ways:** -* Add `sessionVariables=group_commit=async_mode` in JDBC url +* Through JDBC url by adding `sessionVariables=group_commit=async_mode` ``` -url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode&sessionVariables=enable_nereids_planner=false ``` -* Use `SET group_commit = async_mode;` command +* Through SQL execution ``` try (Statement statement = conn.createStatement()) { @@ -140,11 +96,11 @@ try (Statement statement = conn.createStatement()) { } ``` -3. Using `PreparedStatement` +**3. Use `PreparedStatement`** ```java private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; -private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode"; +private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=50$sessionVariables=group_commit=async_mode"; private static final String HOST = "127.0.0.1"; private static final int PORT = 9087; private static final String DB = "db"; @@ -179,142 +135,143 @@ private static void groupCommitInsertBatch() throws Exception { } ``` -**Note:** Due to the high frequency of `INSERT INTO` statements, a large amount of audit logs might be printed, which could impact overall performance. By default, the audit log for prepared statements is disabled. You can control whether to print the audit log for prepared statements by setting a session variable. +Note: Since high-frequency insert into statements will print large amounts of audit logs affecting final performance, printing prepared statement audit logs is disabled by default. You can control whether to print prepared statement audit logs through session variable settings. ```sql -# Configure the session variable to enable printing the audit log for prepared statements. By default, it is set to false, which disables printing the audit log for prepared statements. +# Configure session variable to enable printing prepared statement audit log, default is false set enable_prepared_stmt_audit_log=true; ``` -For more usage on **JDBC**, refer to [Using Insert to Synchronize Data](./import-way/insert-into-manual). +For more about **JDBC** usage, refer to [Using Insert Method to Synchronize Data](./import-way/insert-into-manual.md). ### Using Golang for Group Commit -Golang has limited support for prepared statements, so we can manually batch the statements on the client side to improve the performance of Group Commit. Below is an example program. +Since Golang has limited support for prepared statements, we can improve Group Commit performance through manual client-side batching. Here's a sample program: ```Golang package main import ( - "database/sql" - "fmt" - "math/rand" - "strings" - "sync" - "sync/atomic" - "time" - - _ "github.com/go-sql-driver/mysql" + "database/sql" + "fmt" + "math/rand" + "strings" + "sync" + "sync/atomic" + "time" + + _ "github.com/go-sql-driver/mysql" ) const ( - host = "127.0.0.1" - port = 9038 - db = "test" - user = "root" - password = "" - table = "async_lineitem" + host = "127.0.0.1" + port = 9038 + db = "test" + user = "root" + password = "" + table = "async_lineitem" ) var ( - threadCount = 20 - batchSize = 100 + threadCount = 20 + batchSize = 100 ) var totalInsertedRows int64 var rowsInsertedLastSecond int64 func main() { - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", user, password, host, port, db) - db, err := sql.Open("mysql", dbDSN) - if err != nil { - fmt.Printf("Error opening database: %s\n", err) - return - } - defer db.Close() - - var wg sync.WaitGroup - for i := 0; i < threadCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - groupCommitInsertBatch(db) - }() - } - - go logInsertStatistics() - - wg.Wait() + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", user, password, host, port, db) + db, err := sql.Open("mysql", dbDSN) + if err != nil { + fmt.Printf("Error opening database: %s\n", err) + return + } + defer db.Close() + + var wg sync.WaitGroup + for i := 0; i < threadCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + groupCommitInsertBatch(db) + }() + } + + go logInsertStatistics() + + wg.Wait() } func groupCommitInsertBatch(db *sql.DB) { - for { - valueStrings := make([]string, 0, batchSize) - valueArgs := make([]interface{}, 0, batchSize*16) - for i := 0; i < batchSize; i++ { - valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, "N") - valueArgs = append(valueArgs, "O") - valueArgs = append(valueArgs, time.Now()) - valueArgs = append(valueArgs, time.Now()) - valueArgs = append(valueArgs, time.Now()) - valueArgs = append(valueArgs, "DELIVER IN PERSON") - valueArgs = append(valueArgs, "SHIP") - valueArgs = append(valueArgs, "N/A") - } - stmt := fmt.Sprintf("INSERT INTO %s VALUES %s", - table, strings.Join(valueStrings, ",")) - _, err := db.Exec(stmt, valueArgs...) - if err != nil { - fmt.Printf("Error executing batch: %s\n", err) - return - } - atomic.AddInt64(&rowsInsertedLastSecond, int64(batchSize)) - atomic.AddInt64(&totalInsertedRows, int64(batchSize)) - } + for { + valueStrings := make([]string, 0, batchSize) + valueArgs := make([]interface{}, 0, batchSize*16) + for i := 0; i < batchSize; i++ { + valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, "N") + valueArgs = append(valueArgs, "O") + valueArgs = append(valueArgs, time.Now()) + valueArgs = append(valueArgs, time.Now()) + valueArgs = append(valueArgs, time.Now()) + valueArgs = append(valueArgs, "DELIVER IN PERSON") + valueArgs = append(valueArgs, "SHIP") + valueArgs = append(valueArgs, "N/A") + } + stmt := fmt.Sprintf("INSERT INTO %s VALUES %s", + table, strings.Join(valueStrings, ",")) + _, err := db.Exec(stmt, valueArgs...) + if err != nil { + fmt.Printf("Error executing batch: %s\n", err) + return + } + atomic.AddInt64(&rowsInsertedLastSecond, int64(batchSize)) + atomic.AddInt64(&totalInsertedRows, int64(batchSize)) + } } func logInsertStatistics() { - for { - time.Sleep(1 * time.Second) - fmt.Printf("Total inserted rows: %d\n", totalInsertedRows) - fmt.Printf("Rows inserted in the last second: %d\n", rowsInsertedLastSecond) - rowsInsertedLastSecond = 0 - } + for { + time.Sleep(1 * time.Second) + fmt.Printf("Total inserted rows: %d\n", totalInsertedRows) + fmt.Printf("Rows inserted in the last second: %d\n", rowsInsertedLastSecond) + rowsInsertedLastSecond = 0 + } } ``` ### INSERT INTO VALUES -* async_mode +* Asynchronous Mode + ```sql -# Config session variable to enable the async group commit, the default value is off_mode +# Configure session variable to enable group commit (default is off_mode), enable asynchronous mode mysql> set group_commit = async_mode; -# The retured label is start with 'group_commit', which is the label of the real load job +# The returned label is prefixed with group_commit, indicating whether group commit is used mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); Query OK, 2 rows affected (0.05 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -# The returned label and txn_id are the same as the above, which means they are handled in on load job +# The label, txn_id, and previous one are the same, indicating that they are accumulated into the same import task mysql> insert into dt(id, name) values(3, 'John'); Query OK, 1 row affected (0.01 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -# The data is not visible +# Cannot query immediately mysql> select * from dt; Empty set (0.01 sec) -# After about 10 seconds, the data is visible +# 10 seconds later, data can be queried, and data visibility delay can be controlled by table attribute group_commit_interval. mysql> select * from dt; +------+-------+-------+ | id | name | score | @@ -326,18 +283,18 @@ mysql> select * from dt; 3 rows in set (0.02 sec) ``` -* sync_mode +* Synchronous Mode + ```sql -# Config session variable to enable the sync group commit +# Configure session variable to enable group commit (default is off_mode), enable synchronous mode mysql> set group_commit = sync_mode; -# The retured label is start with 'group_commit', which is the label of the real load job. -# The insert costs at least the group_commit_interval_ms of table property. +# The returned label is prefixed with group_commit, indicating whether group commit is used, and import time is at least table attribute group_commit_interval. mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); Query OK, 2 rows affected (10.06 sec) {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} -# The data is visible after the insert is returned +# Data can be read immediately mysql> select * from dt; +------+-------+-------+ | id | name | score | @@ -351,22 +308,25 @@ mysql> select * from dt; 5 rows in set (0.03 sec) ``` -* off_mode +* Off Mode + ```sql mysql> set group_commit = off_mode; ``` ### Stream Load -If the content of `data.csv` is: +Assuming `data.csv` contains: + ```sql 6,Amy,60 7,Ross,98 ``` -* async_mode +* Asynchronous Mode + ```sql -# Add 'group_commit:async_mode' configuration in the http header +# Import with "group_commit:async_mode" configuration in header curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load { @@ -387,13 +347,14 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mo "WriteDataTimeMs": 26 } -# The returned 'GroupCommit' is 'true', which means this is a group commit load -# The retured label is start with 'group_commit', which is the label of the real load job +# The returned GroupCommit is true, indicating that the group commit process is entered +# The returned Label is prefixed with group_commit, indicating the label associated with the import that truly consumes data ``` -* sync_mode +* Synchronous Mode + ```sql -# Add 'group_commit:sync_mode' configuration in the http header +# Import with "group_commit:sync_mode" configuration in header curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load { @@ -414,50 +375,107 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mod "WriteDataTimeMs": 10038 } -# The returned 'GroupCommit' is 'true', which means this is a group commit load -# The retured label is start with 'group_commit', which is the label of the real load job +# The returned GroupCommit is true, indicating that the group commit process is entered +# The returned Label is prefixed with group_commit, indicating the label associated with the import that truly consumes data ``` -See [Stream Load](./import-way/stream-load-manual.md) for more detailed syntax used by **Stream Load**. +About Stream Load usage, please refer to [Stream Load](./import-way/stream-load-manual). -## Group commit condition -The data will be automatically committed either when the time interval (default is 10 seconds) or the data size (default is 64 MB) conditions meet. +Data is automatically committed when either time interval (default 10 seconds) or data volume (default 64 MB) condition is met. These parameters should be used together and tuned based on actual scenarios. -### Modify the time interval condition +### Modifying Commit Interval -The default group commit interval is 10 seconds. Users can modify the configuration of the table: +Default commit interval is 10 seconds, users can adjust through table configuration: ```sql -# Modify the group commit interval to 2 seconds +# Modify commit interval to 2 seconds ALTER TABLE dt SET ("group_commit_interval_ms" = "2000"); ``` -### Modify the data size condition +**Parameter Adjustment Recommendations**: +- Shorter intervals (e.g., 2 seconds): + - Pros: Lower data visibility latency, suitable for scenarios requiring high real-time performance + - Cons: More commits, faster version growth, higher background compaction pressure + +- Longer intervals (e.g., 30 seconds): + - Pros: Larger commit batches, slower version growth, lower system overhead + - Cons: Higher data visibility latency + +Recommend setting based on business tolerance for data visibility delay. If system pressure is high, consider increasing the interval. -The default group commit data size is 64 MB. Users can modify the configuration of the table: +### Modifying Commit Data Volume + +Group Commit's default commit data volume is 64 MB, users can adjust through table configuration: ```sql -# Modify the group commit data size to 128MB +# Modify commit data volume to 128MB ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); ``` -## Relevant system configuration +**Parameter Adjustment Recommendations**: +- Smaller threshold (e.g., 32MB): + - Pros: Less memory usage, suitable for resource-constrained environments + - Cons: Smaller commit batches, potentially limited throughput + +- Larger threshold (e.g., 256MB): + - Pros: Higher batch commit efficiency, greater system throughput + - Cons: Uses more memory + +Recommend balancing based on system memory resources and data reliability requirements. If memory is sufficient and higher throughput is desired, consider increasing to 128MB or more. + + +### BE Configuration + +1. `group_commit_wal_path` + + * Description: Directory for storing group commit WAL files + + * Default: Creates a `wal` directory under each configured `storage_root_path`. Configuration example: + + ``` + group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal + ``` + +2. `group_commit_memory_rows_for_max_filter_ratio` + + * Description: `max_filter_ratio` works normally when group commit load total rows don't exceed this value, otherwise it doesn't work + + * Default: 10000 + +## Usage Limitations + +* **Group Commit Limitations** -### BE configuration + * `INSERT INTO VALUES` statements degrade to non-Group Commit mode in these cases: + - Transaction writes (`Begin; INSERT INTO VALUES; COMMIT`) + - Specified Label (`INSERT INTO dt WITH LABEL {label} VALUES`) + - VALUES containing expressions (`INSERT INTO dt VALUES (1 + 100)`) + - Column update writes + - Table doesn't support lightweight mode changes -#### `group_commit_wal_path` + * `Stream Load` degrades to non-Group Commit mode in these cases: + - Using two-phase commit + - Specified Label (`-H "label:my_label"`) + - Column update writes + - Table doesn't support lightweight mode changes -* The `WAL` directory of group commit. -* Default: A directory named `wal` is created under each directory of the `storage_root_path`. Configuration examples: - ``` - group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal - ``` +* **Unique Model** + - Group Commit doesn't guarantee commit order, recommend using Sequence column to ensure data consistency. -#### `group_commit_memory_rows_for_max_filter_ratio` +* **max_filter_ratio Support** + - In default loads, `filter_ratio` is calculated through failed rows and total rows. + - In Group Commit mode, `max_filter_ratio` works when total rows don't exceed `group_commit_memory_rows_for_max_filter_ratio`. -* Description: The `max_filter_ratio` limit can only work if the total rows of `group commit` is less than this value. -* Default: 10000 +* **WAL Limitations** + - `async_mode` writes data to WAL, deletes after success, recovers through WAL on failure. + - WAL files are stored on only one BE, disk damage or file loss may cause data loss. + - When offlining BE nodes, use `DECOMMISSION` command to prevent data loss. + - `async_mode` switches to `sync_mode` in these cases: + - Load data volume too large (exceeds 80% of WAL single directory space) + - Unknown data volume chunked stream load + - Insufficient disk space + - During heavyweight Schema Change, Group Commit writes are rejected, client needs to retry. ## Performance diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/group-commit-manual.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/group-commit-manual.md index b1ce7ac7a8..5a2e27fb62 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/group-commit-manual.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/group-commit-manual.md @@ -24,8 +24,17 @@ specific language governing permissions and limitations under the License. --> +在高频小批量写入场景下,传统的导入方式存在以下问题: -Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用 JDBC 将数据高频写入 Doris,同时通过使用 PreparedStatement 可以获得更高的性能。在日志场景下,您也可以利用 Stream Load 将数据高频写入 Doris。 +- 每个导入都会创建一个独立的事务,都需要经过 FE 解析 SQL 和生成执行计划,影响整体性能 +- 每个导入都会生成一个新的版本,导致版本数快速增长,增加了后台compaction的压力 + +为了解决这些问题,Doris 引入了 Group Commit 机制。Group Commit 不是一种新的导入方式,而是对现有导入方式的优化扩展,主要针对: + +- `INSERT INTO tbl VALUES(...)` 语句 +- Stream Load 导入 + +通过将多个小批量导入在后台合并成一个大的事务提交,显著提升了高并发小批量写入的性能。同时,Group Commit 与 PreparedStatement 结合使用可以获得更高的性能提升。 ## Group Commit 模式 @@ -45,58 +54,6 @@ Group Commit 写入有三种模式,分别是: WAL的数量可以通过FE http接口查看,具体可见[这里](../../admin-manual/open-api/fe-http/get-wal-size-action),也可以在BE的metrics中搜索关键词`wal`查看。 -## 使用限制 - -* 当开启了 Group Commit 模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合 Group Commit 的条件,如果符合,该语句的执行会进入到 Group Commit 写入中。符合以下条件会自动退化为非 Group Commit 方式: - - + 事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式 - - + 指定 Label,即`INSERT INTO dt WITH LABEL {label} VALUES` - - + VALUES 中包含表达式,即`INSERT INTO dt VALUES (1 + 100)` - - + 列更新写入 - - + 表不支持 light schema change - -* 当开启了 Group Commit 模式,系统会判断用户发起的`Stream Load`是否符合 Group Commit 的条件,如果符合,该导入的执行会进入到 Group Commit 写入中。符合以下条件的会自动退化为非 Group Commit 方式: - - + 两阶段提交 - - + 指定 Label,即通过 `-H "label:my_label"`设置 - - + 列更新写入 - - + 表不支持 light schema change - -+ 对于 Unique 模型,由于 Group Commit 不能保证提交顺序,用户可以配合 Sequence 列使用来保证数据一致性 - -* 对`max_filter_ratio`语义的支持 - - * 在默认的导入中,`filter_ratio`是导入完成后,通过失败的行数和总行数计算,决定是否提交本次写入 - - * 在 Group Commit 模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能 commit transaction - - * Group Commit 模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在`be.conf`中,默认为`10000`行),`max_filter_ratio` 工作 - -* WAL 限制 - - * 对于`async_mode`的 Group Commit 写入,会把数据写入 WAL。如果内部导入成功,则 WAL 被立刻删除;如果内部导入失败,通过导入 WAL 的方法来恢复数据 - - * 目前 WAL 文件只存储在一个 BE 上,如果这个 BE 磁盘损坏或文件误删等,可能导入丢失部分数据 - - * 当下线 BE 节点时,请使用[`DECOMMISSION`](../../../sql-manual/sql-statements/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND)命令,安全下线节点,防止该节点下线前 WAL 文件还没有全部处理完成,导致部分数据丢失 - - * 对于`async_mode`的 Group Commit 写入,为了保护磁盘空间,当遇到以下情况时,会切换成`sync_mode` - - * 导入数据量过大,即超过 WAL 单目录的 80% 空间 - - * 不知道数据量的 chunked stream load - - * 导入数据量不大,但磁盘可用空间不足 - - * 当发生重量级 Schema Change(目前加减列、修改 varchar 长度和重命名列是轻量级 Schema Change,其它的是重量级 Schema Change)时,为了保证 WAL 能够适配表的 Schema,在 Schema Change 最后的 FE 修改元数据阶段,会拒绝 Group Commit 写入,客户端收到 `insert table ${table_name} is blocked on schema change` 异常,客户端重试即可 - ## Group Commit 使用方式 假如表的结构为: @@ -127,30 +84,30 @@ url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionStat * 通过 JDBC url 设置,增加`sessionVariables=group_commit=async_mode` - ``` - url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode&sessionVariables=enable_nereids_planner=false - ``` +``` +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode&sessionVariables=enable_nereids_planner=false +``` * 通过执行 SQL 设置 - ``` - try (Statement statement = conn.createStatement()) { - statement.execute("SET group_commit = async_mode;"); - } - ``` +``` +try (Statement statement = conn.createStatement()) { + statement.execute("SET group_commit = async_mode;"); +} +``` **3. 使用 `PreparedStatement`** ```java private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; -private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode&sessionVariables=enable_nereids_planner=false"; +private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=50$sessionVariables=group_commit=async_mode"; private static final String HOST = "127.0.0.1"; private static final int PORT = 9087; private static final String DB = "db"; private static final String TBL = "dt"; private static final String USER = "root"; private static final String PASSWD = ""; -private static final int INSERT_BATCH_SIZE = 10; +private static final int INSERT_BATCH_SIZE = 10; private static void groupCommitInsertBatch() throws Exception { Class.forName(JDBC_DRIVER); @@ -296,66 +253,66 @@ func logInsertStatistics() { * 异步模式 - ```sql - # 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式 - mysql> set group_commit = async_mode; - - # 这里返回的 label 是 group_commit 开头的,可以区分出是否使用了 group commit - mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); - Query OK, 2 rows affected (0.05 sec) - {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} - - # 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中 - mysql> insert into dt(id, name) values(3, 'John'); - Query OK, 1 row affected (0.01 sec) - {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} - - # 不能立刻查询到 - mysql> select * from dt; - Empty set (0.01 sec) - - # 10 秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。 - mysql> select * from dt; - +------+-------+-------+ - | id | name | score | - +------+-------+-------+ - | 1 | Bob | 90 | - | 2 | Alice | 99 | - | 3 | John | NULL | - +------+-------+-------+ - 3 rows in set (0.02 sec) - ``` +```sql +# 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式 +mysql> set group_commit = async_mode; + +# 这里返回的 label 是 group_commit 开头的,可以区分出是否使用了 group commit +mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); +Query OK, 2 rows affected (0.05 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中 +mysql> insert into dt(id, name) values(3, 'John'); +Query OK, 1 row affected (0.01 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# 不能立刻查询到 +mysql> select * from dt; +Empty set (0.01 sec) + +# 10 秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。 +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | ++------+-------+-------+ +3 rows in set (0.02 sec) +``` * 同步模式 - ```sql - # 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式 - mysql> set group_commit = sync_mode; - - # 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。 - mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); - Query OK, 2 rows affected (10.06 sec) - {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} - - # 数据可以立刻读出 - mysql> select * from dt; - +------+-------+-------+ - | id | name | score | - +------+-------+-------+ - | 1 | Bob | 90 | - | 2 | Alice | 99 | - | 3 | John | NULL | - | 4 | Bob | 90 | - | 5 | Alice | 99 | - +------+-------+-------+ - 5 rows in set (0.03 sec) - ``` +```sql +# 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式 +mysql> set group_commit = sync_mode; + +# 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。 +mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); +Query OK, 2 rows affected (10.06 sec) +{'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} + +# 数据可以立刻读出 +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | +| 4 | Bob | 90 | +| 5 | Alice | 99 | ++------+-------+-------+ +5 rows in set (0.03 sec) +``` * 关闭模式 - ```sql - mysql> set group_commit = off_mode; - ``` +```sql +mysql> set group_commit = off_mode; +``` ### Stream Load @@ -368,66 +325,66 @@ func logInsertStatistics() { * 异步模式 - ```sql - # 导入时在 header 中增加"group_commit:async_mode"配置 - - curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load - { - "TxnId": 7009, - "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", - "Comment": "", - "GroupCommit": true, - "Status": "Success", - "Message": "OK", - "NumberTotalRows": 2, - "NumberLoadedRows": 2, - "NumberFilteredRows": 0, - "NumberUnselectedRows": 0, - "LoadBytes": 19, - "LoadTimeMs": 35, - "StreamLoadPutTimeMs": 5, - "ReadDataTimeMs": 0, - "WriteDataTimeMs": 26 - } +```sql +# 导入时在 header 中增加"group_commit:async_mode"配置 + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 7009, + "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 35, + "StreamLoadPutTimeMs": 5, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 26 +} - # 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 - # 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label - ``` +# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 +# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label +``` * 同步模式 - ```sql - # 导入时在 header 中增加"group_commit:sync_mode"配置 - - curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load - { - "TxnId": 3009, - "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", - "Comment": "", - "GroupCommit": true, - "Status": "Success", - "Message": "OK", - "NumberTotalRows": 2, - "NumberLoadedRows": 2, - "NumberFilteredRows": 0, - "NumberUnselectedRows": 0, - "LoadBytes": 19, - "LoadTimeMs": 10044, - "StreamLoadPutTimeMs": 4, - "ReadDataTimeMs": 0, - "WriteDataTimeMs": 10038 - } +```sql +# 导入时在 header 中增加"group_commit:sync_mode"配置 + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 3009, + "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 10044, + "StreamLoadPutTimeMs": 4, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 10038 +} - # 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 - # 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label - ``` +# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 +# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label +``` - 关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](./import-way/stream-load-manual)。 +关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](./import-way/stream-load-manual)。 ## 自动提交条件 -当满足时间间隔 (默认为 10 秒) 或数据量 (默认为 64 MB) 其中一个条件时,会自动提交数据。 +当满足时间间隔 (默认为 10 秒) 或数据量 (默认为 64 MB) 其中一个条件时,会自动提交数据。这两个参数需要配合使用,建议根据实际场景进行调优。 ### 修改提交间隔 @@ -438,6 +395,17 @@ func logInsertStatistics() { ALTER TABLE dt SET ("group_commit_interval_ms" = "2000"); ``` +**参数调整建议**: +- 较短的间隔(如2秒): + - 优点:数据可见性延迟更低,适合对实时性要求较高的场景 + - 缺点:提交次数增多,版本数增长更快,后台compaction压力更大 + +- 较长的间隔(如30秒): + - 优点:提交批次更大,版本数增长更慢,系统开销更小 + - 缺点:数据可见性延迟更高 + +建议根据业务对数据可见性延迟的容忍度来设置,如果系统压力大,可以适当增加间隔。 + ### 修改提交数据量 Group Commit 的默认提交数据量为 64 MB,用户可以通过修改表的配置调整: @@ -447,6 +415,18 @@ Group Commit 的默认提交数据量为 64 MB,用户可以通过修改表的 ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); ``` +**参数调整建议**: +- 较小的阈值(如32MB): + - 优点:内存占用更少,适合资源受限的环境 + - 缺点:提交批次较小,吞吐量可能受限 + +- 较大的阈值(如256MB): + - 优点:批量提交效率更高,系统吞吐量更大 + - 缺点:占用更多内存 + +建议根据系统内存资源和数据可靠性要求来权衡。如果内存充足且追求更高吞吐,可以适当增加到128MB或更大。 + + ## 相关系统配置 ### BE 配置 @@ -467,13 +447,47 @@ ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); * 默认值:10000 +## 使用限制 + +* **Group Commit 限制条件** + + * `INSERT INTO VALUES` 语句在以下情况下会退化为非 Group Commit 方式: + - 使用事务写入 (`Begin; INSERT INTO VALUES; COMMIT`) + - 指定 Label (`INSERT INTO dt WITH LABEL {label} VALUES`) + - VALUES 中包含表达式 (`INSERT INTO dt VALUES (1 + 100)`) + - 列更新写入 + - 表不支持轻量级模式更改 + + * `Stream Load` 在以下情况下会退化为非 Group Commit 方式: + - 使用两阶段提交 + - 指定 Label (`-H "label:my_label"`) + - 列更新写入 + - 表不支持轻量级模式更改 + +* **Unique 模型** + - Group Commit 不保证提交顺序,建议使用 Sequence 列来保证数据一致性。 + +* **max_filter_ratio 支持** + - 默认导入中,`filter_ratio` 通过失败行数和总行数计算。 + - Group Commit 模式下,`max_filter_ratio` 在总行数不超过 `group_commit_memory_rows_for_max_filter_ratio` 时有效。 + +* **WAL 限制** + - `async_mode` 写入会将数据写入 WAL,成功后删除,失败时通过 WAL 恢复。 + - WAL 文件仅存储在一个 BE 上,磁盘损坏或文件丢失可能导致数据丢失。 + - 下线 BE 节点时,使用 `DECOMMISSION` 命令以防数据丢失。 + - `async_mode` 在以下情况下切换为 `sync_mode`: + - 导入数据量过大(超过 WAL 单目录 80% 空间) + - 不知道数据量的 chunked stream load + - 磁盘可用空间不足 + - 重量级 Schema Change 时,拒绝 Group Commit 写入,客户端需重试。 + ## 性能 我们分别测试了使用`Stream Load`和`JDBC`在高并发小数据量场景下`group commit`(使用`async mode`) 的写入性能。 ### Stream Load 日志场景测试 -**机器配置** +#### 机器配置 * 1 台 FE:阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘 @@ -483,19 +497,19 @@ ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); * 测试版本为Doris-3.0.1 -**数据集** +#### 数据集 * `httplogs` 数据集,总共 31GB、2.47 亿条 -**测试工具** +#### 测试工具 * [doris-streamloader](/ecosystem/doris-streamloader.md) -**测试方法** +#### 测试方法 * 对比 `非 group_commit` 和 `group_commit `的 `async_mode` 模式下,设置不同的单并发数据量和并发数,导入 `247249096` 行数据 -**测试结果** +#### 测试结果 | 导入方式 | 单并发数据量 | 并发数 | 耗时 (秒) | 导入速率 (行/秒) | 导入吞吐 (MB/秒) | |----------------|---------|------|-----------|----------|-----------| @@ -518,7 +532,7 @@ ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); ### JDBC -**机器配置** +#### 机器配置 * 1 台 FE:阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘 @@ -530,19 +544,19 @@ ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); * 关闭打印parpared语句的audit log以提高性能 -**数据集** +#### 数据集 * tpch sf10 `lineitem` 表数据集,30 个文件,总共约 22 GB,1.8 亿行 -**测试工具** +#### 测试工具 * [DataX](https://github.com/alibaba/DataX) -**测试方法** +#### 测试方法 * 通过 `txtfilereader` 向 `mysqlwriter` 写入数据,配置不同并发数和单个 `INSERT` 的行数 -**测试结果** +#### 测试结果 | 单个 insert 的行数 | 并发数 | 导入速率 (行/秒) | 导入吞吐 (MB/秒) | |-------------|-----|-----------|----------| @@ -554,7 +568,7 @@ ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); ### Insert into sync 模式小批量数据 -**机器配置** +#### 机器配置 * 1 台 FE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘 @@ -564,7 +578,7 @@ ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); * 测试版本为Doris-3.0.1 -**数据集** +#### 数据集 * tpch sf10 `lineitem` 表数据集。 @@ -595,7 +609,7 @@ PROPERTIES ( ); ``` -**测试工具** +#### 测试工具 * [Jmeter](https://jmeter.apache.org/) @@ -610,32 +624,31 @@ PROPERTIES ( 4. 设置导入语句。 5. 设置每次需要导入的值,注意,导入的值与导入值的类型要一一匹配。 -**测试方法** +#### 测试方法 * 通过 `Jmeter` 向`Doris`写数据。每个并发每次通过insert into写入1行数据。 -**测试结果** +#### 测试结果 * 数据单位为行每秒。 * 以下测试分为30,100,500并发。 -**30并发sync模式5个BE3副本性能测试** +#### 30并发sync模式5个BE3副本性能测试 | Group commit internal | 10ms | 20ms | 50ms | 100ms | |-----------------------|---------------|---------------|---------------|---------------| |enable_nereids_planner=true| 891.8 | 701.1 | 400.0 | 237.5 | |enable_nereids_planner=false| 885.8 | 688.1 | 398.7 | 232.9 | - -**100并发sync模式5个BE3副本性能测试** +#### 100并发sync模式5个BE3副本性能测试 | Group commit internal | 10ms | 20ms | 50ms | 100ms | |-----------------------|---------------|---------------|---------------|---------------| |enable_nereids_planner=true| 2427.8 | 2068.9 | 1259.4 | 764.9 | |enable_nereids_planner=false| 2320.4 | 1899.3 | 1206.2 |749.7| -**500并发sync模式5个BE3副本性能测试** +#### 500并发sync模式5个BE3副本性能测试 | Group commit internal | 10ms | 20ms | 50ms | 100ms | |-----------------------|---------------|---------------|---------------|---------------| @@ -644,7 +657,7 @@ PROPERTIES ( ### Insert into sync 模式大批量数据 -**机器配置** +#### 机器配置 * 1 台 FE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘 @@ -654,7 +667,7 @@ PROPERTIES ( * 测试版本为Doris-3.0.1 -**数据集** +#### 数据集 * tpch sf10 `lineitem` 表数据集。 @@ -685,35 +698,35 @@ PROPERTIES ( ); ``` -**测试工具** +#### 测试工具 * [Jmeter](https://jmeter.apache.org/) -**测试方法** +#### 测试方法 * 通过 `Jmeter` 向`Doris`写数据。每个并发每次通过insert into写入1000行数据。 -**测试结果** +#### 测试结果 * 数据单位为行每秒。 * 以下测试分为30,100,500并发。 -**30并发sync模式5个BE3副本性能测试** +#### 30并发sync模式5个BE3副本性能测试 | Group commit internal | 10ms | 20ms | 50ms | 100ms | |-----------------------|---------------|---------------|---------------|---------------| |enable_nereids_planner=true| 9.1K | 11.1K | 11.4K | 11.1K | |enable_nereids_planner=false| 157.8K | 159.9K | 154.1K | 120.4K | -**100并发sync模式5个BE3副本性能测试** +#### 100并发sync模式5个BE3副本性能测试 | Group commit internal | 10ms | 20ms | 50ms | 100ms | |-----------------------|---------------|---------------|---------------|---------------| |enable_nereids_planner=true| 10.0K |9.2K | 8.9K | 8.9K | |enable_nereids_planner=false| 130.4k | 131.0K | 130.4K | 124.1K | -**500并发sync模式5个BE3副本性能测试** +#### 500并发sync模式5个BE3副本性能测试 | Group commit internal | 10ms | 20ms | 50ms | 100ms | |-----------------------|---------------|---------------|---------------|---------------| diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/group-commit-manual.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/group-commit-manual.md index c8b604200e..9056f59737 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/group-commit-manual.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/data-operate/import/group-commit-manual.md @@ -24,8 +24,17 @@ specific language governing permissions and limitations under the License. --> +在高频小批量写入场景下,传统的导入方式存在以下问题: -Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用 JDBC 将数据高频写入 Doris,同时通过使用 PreparedStatement 可以获得更高的性能。在日志场景下,您也可以利用 Stream Load 将数据高频写入 Doris。 +- 每个导入都会创建一个独立的事务,都需要经过 FE 解析 SQL 和生成执行计划,影响整体性能 +- 每个导入都会生成一个新的版本,导致版本数快速增长,增加了后台compaction的压力 + +为了解决这些问题,Doris 引入了 Group Commit 机制。Group Commit 不是一种新的导入方式,而是对现有导入方式的优化扩展,主要针对: + +- `INSERT INTO tbl VALUES(...)` 语句 +- Stream Load 导入 + +通过将多个小批量导入在后台合并成一个大的事务提交,显著提升了高并发小批量写入的性能。同时,Group Commit 与 PreparedStatement 结合使用可以获得更高的性能提升。 ## Group Commit 模式 @@ -45,58 +54,6 @@ Group Commit 写入有三种模式,分别是: WAL的数量可以通过FE http接口查看,具体可见[这里](../../admin-manual/open-api/fe-http/get-wal-size-action),也可以在BE的metrics中搜索关键词`wal`查看。 -## 使用限制 - -* 当开启了 Group Commit 模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合 Group Commit 的条件,如果符合,该语句的执行会进入到 Group Commit 写入中。符合以下条件会自动退化为非 Group Commit 方式: - - + 事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式 - - + 指定 Label,即`INSERT INTO dt WITH LABEL {label} VALUES` - - + VALUES 中包含表达式,即`INSERT INTO dt VALUES (1 + 100)` - - + 列更新写入 - - + 表不支持 light schema change - -* 当开启了 Group Commit 模式,系统会判断用户发起的`Stream Load`和`Http Stream`是否符合 Group Commit 的条件,如果符合,该导入的执行会进入到 Group Commit 写入中。符合以下条件的会自动退化为非 Group Commit 方式: - - + 两阶段提交 - - + 指定 Label,即通过 `-H "label:my_label"`设置 - - + 列更新写入 - - + 表不支持 light schema change - -+ 对于 Unique 模型,由于 Group Commit 不能保证提交顺序,用户可以配合 Sequence 列使用来保证数据一致性 - -* 对`max_filter_ratio`语义的支持 - - * 在默认的导入中,`filter_ratio`是导入完成后,通过失败的行数和总行数计算,决定是否提交本次写入 - - * 在 Group Commit 模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能 commit transaction - - * Group Commit 模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在`be.conf`中,默认为`10000`行),`max_filter_ratio` 工作 - -* WAL 限制 - - * 对于`async_mode`的 Group Commit 写入,会把数据写入 WAL。如果内部导入成功,则 WAL 被立刻删除;如果内部导入失败,通过导入 WAL 的方法来恢复数据 - - * 目前 WAL 文件只存储在一个 BE 上,如果这个 BE 磁盘损坏或文件误删等,可能导入丢失部分数据 - - * 当下线 BE 节点时,请使用[`DECOMMISSION`](../../sql-manual/sql-statements/cluster-management/instance-management/DECOMMISSION-BACKEND)命令,安全下线节点,防止该节点下线前 WAL 文件还没有全部处理完成,导致部分数据丢失 - - * 对于`async_mode`的 Group Commit 写入,为了保护磁盘空间,当遇到以下情况时,会切换成`sync_mode` - - * 导入数据量过大,即超过 WAL 单目录的 80% 空间 - - * 不知道数据量的 chunked stream load - - * 导入数据量不大,但磁盘可用空间不足 - - * 当发生重量级 Schema Change(目前加减列、修改 varchar 长度和重命名列是轻量级 Schema Change,其它的是重量级 Schema Change)时,为了保证 WAL 能够适配表的 Schema,在 Schema Change 最后的 FE 修改元数据阶段,会拒绝 Group Commit 写入,客户端收到 `insert table ${table_name} is blocked on schema change` 异常,客户端重试即可 - ## Group Commit 使用方式 假如表的结构为: @@ -127,17 +84,17 @@ url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionStat * 通过 JDBC url 设置,增加`sessionVariables=group_commit=async_mode` - ``` - url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode - ``` +``` +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode +``` * 通过执行 SQL 设置 - ``` - try (Statement statement = conn.createStatement()) { - statement.execute("SET group_commit = async_mode;"); - } - ``` +``` +try (Statement statement = conn.createStatement()) { + statement.execute("SET group_commit = async_mode;"); +} +``` **3. 使用 `PreparedStatement`** @@ -296,66 +253,66 @@ func logInsertStatistics() { * 异步模式 - ```sql - # 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式 - mysql> set group_commit = async_mode; - - # 这里返回的 label 是 group_commit 开头的,可以区分出是否使用了 group commit - mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); - Query OK, 2 rows affected (0.05 sec) - {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} - - # 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中 - mysql> insert into dt(id, name) values(3, 'John'); - Query OK, 1 row affected (0.01 sec) - {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} - - # 不能立刻查询到 - mysql> select * from dt; - Empty set (0.01 sec) - - # 10 秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。 - mysql> select * from dt; - +------+-------+-------+ - | id | name | score | - +------+-------+-------+ - | 1 | Bob | 90 | - | 2 | Alice | 99 | - | 3 | John | NULL | - +------+-------+-------+ - 3 rows in set (0.02 sec) - ``` +```sql +# 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式 +mysql> set group_commit = async_mode; + +# 这里返回的 label 是 group_commit 开头的,可以区分出是否使用了 group commit +mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); +Query OK, 2 rows affected (0.05 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中 +mysql> insert into dt(id, name) values(3, 'John'); +Query OK, 1 row affected (0.01 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# 不能立刻查询到 +mysql> select * from dt; +Empty set (0.01 sec) + +# 10 秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。 +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | ++------+-------+-------+ +3 rows in set (0.02 sec) +``` * 同步模式 - ```sql - # 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式 - mysql> set group_commit = sync_mode; - - # 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。 - mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); - Query OK, 2 rows affected (10.06 sec) - {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} - - # 数据可以立刻读出 - mysql> select * from dt; - +------+-------+-------+ - | id | name | score | - +------+-------+-------+ - | 1 | Bob | 90 | - | 2 | Alice | 99 | - | 3 | John | NULL | - | 4 | Bob | 90 | - | 5 | Alice | 99 | - +------+-------+-------+ - 5 rows in set (0.03 sec) - ``` +```sql +# 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式 +mysql> set group_commit = sync_mode; + +# 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。 +mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); +Query OK, 2 rows affected (10.06 sec) +{'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} + +# 数据可以立刻读出 +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | +| 4 | Bob | 90 | +| 5 | Alice | 99 | ++------+-------+-------+ +5 rows in set (0.03 sec) +``` * 关闭模式 - ```sql - mysql> set group_commit = off_mode; - ``` +```sql +mysql> set group_commit = off_mode; +``` ### Stream Load @@ -368,66 +325,66 @@ func logInsertStatistics() { * 异步模式 - ```sql - # 导入时在 header 中增加"group_commit:async_mode"配置 - - curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load - { - "TxnId": 7009, - "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", - "Comment": "", - "GroupCommit": true, - "Status": "Success", - "Message": "OK", - "NumberTotalRows": 2, - "NumberLoadedRows": 2, - "NumberFilteredRows": 0, - "NumberUnselectedRows": 0, - "LoadBytes": 19, - "LoadTimeMs": 35, - "StreamLoadPutTimeMs": 5, - "ReadDataTimeMs": 0, - "WriteDataTimeMs": 26 - } +```sql +# 导入时在 header 中增加"group_commit:async_mode"配置 - # 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 - # 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label - ``` +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 7009, + "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 35, + "StreamLoadPutTimeMs": 5, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 26 +} + +# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 +# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label +``` * 同步模式 - ```sql - # 导入时在 header 中增加"group_commit:sync_mode"配置 - - curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load - { - "TxnId": 3009, - "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", - "Comment": "", - "GroupCommit": true, - "Status": "Success", - "Message": "OK", - "NumberTotalRows": 2, - "NumberLoadedRows": 2, - "NumberFilteredRows": 0, - "NumberUnselectedRows": 0, - "LoadBytes": 19, - "LoadTimeMs": 10044, - "StreamLoadPutTimeMs": 4, - "ReadDataTimeMs": 0, - "WriteDataTimeMs": 10038 - } +```sql +# 导入时在 header 中增加"group_commit:sync_mode"配置 + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 3009, + "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 10044, + "StreamLoadPutTimeMs": 4, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 10038 +} - # 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 - # 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label - ``` +# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 +# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label +``` - 关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](./import-way/stream-load-manual)。 +关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](./import-way/stream-load-manual)。 ## 自动提交条件 -当满足时间间隔 (默认为 10 秒) 或数据量 (默认为 64 MB) 其中一个条件时,会自动提交数据。 +当满足时间间隔 (默认为 10 秒) 或数据量 (默认为 64 MB) 其中一个条件时,会自动提交数据。这两个参数需要配合使用,建议根据实际场景进行调优。 ### 修改提交间隔 @@ -438,6 +395,17 @@ func logInsertStatistics() { ALTER TABLE dt SET ("group_commit_interval_ms" = "2000"); ``` +**参数调整建议**: +- 较短的间隔(如2秒): + - 优点:数据可见性延迟更低,适合对实时性要求较高的场景 + - 缺点:提交次数增多,版本数增长更快,后台compaction压力更大 + +- 较长的间隔(如30秒): + - 优点:提交批次更大,版本数增长更慢,系统开销更小 + - 缺点:数据可见性延迟更高 + +建议根据业务对数据可见性延迟的容忍度来设置,如果系统压力大,可以适当增加间隔。 + ### 修改提交数据量 Group Commit 的默认提交数据量为 64 MB,用户可以通过修改表的配置调整: @@ -447,6 +415,18 @@ Group Commit 的默认提交数据量为 64 MB,用户可以通过修改表的 ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); ``` +**参数调整建议**: +- 较小的阈值(如32MB): + - 优点:内存占用更少,适合资源受限的环境 + - 缺点:提交批次较小,吞吐量可能受限 + +- 较大的阈值(如256MB): + - 优点:批量提交效率更高,系统吞吐量更大 + - 缺点:占用更多内存 + +建议根据系统内存资源和数据可靠性要求来权衡。如果内存充足且追求更高吞吐,可以适当增加到128MB或更大。 + + ## 相关系统配置 ### BE 配置 @@ -467,6 +447,40 @@ ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); * 默认值:10000 +## 使用限制 + +* **Group Commit 限制条件** + + * `INSERT INTO VALUES` 语句在以下情况下会退化为非 Group Commit 方式: + - 使用事务写入 (`Begin; INSERT INTO VALUES; COMMIT`) + - 指定 Label (`INSERT INTO dt WITH LABEL {label} VALUES`) + - VALUES 中包含表达式 (`INSERT INTO dt VALUES (1 + 100)`) + - 列更新写入 + - 表不支持轻量级模式更改 + + * `Stream Load` 在以下情况下会退化为非 Group Commit 方式: + - 使用两阶段提交 + - 指定 Label (`-H "label:my_label"`) + - 列更新写入 + - 表不支持轻量级模式更改 + +* **Unique 模型** + - Group Commit 不保证提交顺序,建议使用 Sequence 列来保证数据一致性。 + +* **max_filter_ratio 支持** + - 默认导入中,`filter_ratio` 通过失败行数和总行数计算。 + - Group Commit 模式下,`max_filter_ratio` 在总行数不超过 `group_commit_memory_rows_for_max_filter_ratio` 时有效。 + +* **WAL 限制** + - `async_mode` 写入会将数据写入 WAL,成功后删除,失败时通过 WAL 恢复。 + - WAL 文件仅存储在一个 BE 上,磁盘损坏或文件丢失可能导致数据丢失。 + - 下线 BE 节点时,使用 `DECOMMISSION` 命令以防数据丢失。 + - `async_mode` 在以下情况下切换为 `sync_mode`: + - 导入数据量过大(超过 WAL 单目录 80% 空间) + - 不知道数据量的 chunked stream load + - 磁盘可用空间不足 + - 重量级 Schema Change 时,拒绝 Group Commit 写入,客户端需重试。 + ## 性能 我们分别测试了使用`Stream Load`和`JDBC`在高并发小数据量场景下`group commit`(使用`async mode`) 的写入性能。 @@ -644,7 +658,7 @@ PROPERTIES ( * 1 台 FE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘 -* 5 台 BE:阿里云 16 ��� CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。注:测试中分别用了1台,3台,5台BE进行测试。 +* 5 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。注:测试中分别用了1台,3台,5台BE进行测试。 * 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/group-commit-manual.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/group-commit-manual.md index a26b9e05bf..f206f3dae0 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/group-commit-manual.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/data-operate/import/group-commit-manual.md @@ -24,8 +24,17 @@ specific language governing permissions and limitations under the License. --> +在高频小批量写入场景下,传统的导入方式存在以下问题: -Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用 JDBC 将数据高频写入 Doris,同时通过使用 PreparedStatement 可以获得更高的性能。在日志场景下,您也可以利用 Stream Load 将数据高频写入 Doris。 +- 每个导入都会创建一个独立的事务,都需要经过 FE 解析 SQL 和生成执行计划,影响整体性能 +- 每个导入都会生成一个新的版本,导致版本数快速增长,增加了后台compaction的压力 + +为了解决这些问题,Doris 引入了 Group Commit 机制。Group Commit 不是一种新的导入方式,而是对现有导入方式的优化扩展,主要针对: + +- `INSERT INTO tbl VALUES(...)` 语句 +- Stream Load 导入 + +通过将多个小批量导入在后台合并成一个大的事务提交,显著提升了高并发小批量写入的性能。同时,Group Commit 与 PreparedStatement 结合使用可以获得更高的性能提升。 ## Group Commit 模式 @@ -45,58 +54,6 @@ Group Commit 写入有三种模式,分别是: WAL的数量可以通过FE http接口查看,具体可见[这里](../../admin-manual/open-api/fe-http/get-wal-size-action),也可以在BE的metrics中搜索关键词`wal`查看。 -## 使用限制 - -* 当开启了 Group Commit 模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合 Group Commit 的条件,如果符合,该语句的执行会进入到 Group Commit 写入中。符合以下条件会自动退化为非 Group Commit 方式: - - + 事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式 - - + 指定 Label,即`INSERT INTO dt WITH LABEL {label} VALUES` - - + VALUES 中包含表达式,即`INSERT INTO dt VALUES (1 + 100)` - - + 列更新写入 - - + 表不支持 light schema change - -* 当开启了 Group Commit 模式,系统会判断用户发起的`Stream Load`和`Http Stream`是否符合 Group Commit 的条件,如果符合,该导入的执行会进入到 Group Commit 写入中。符合以下条件的会自动退化为非 Group Commit 方式: - - + 两阶段提交 - - + 指定 Label,即通过 `-H "label:my_label"`设置 - - + 列更新写入 - - + 表不支持 light schema change - -+ 对于 Unique 模型,由于 Group Commit 不能保证提交顺序,用户可以配合 Sequence 列使用来保证数据一致性 - -* 对`max_filter_ratio`语义的支持 - - * 在默认的导入中,`filter_ratio`是导入完成后,通过失败的行数和总行数计算,决定是否提交本次写入 - - * 在 Group Commit 模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能 commit transaction - - * Group Commit 模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在`be.conf`中,默认为`10000`行),`max_filter_ratio` 工作 - -* WAL 限制 - - * 对于`async_mode`的 Group Commit 写入,会把数据写入 WAL。如果内部导入成功,则 WAL 被立刻删除;如果内部导入失败,通过导入 WAL 的方法来恢复数据 - - * 目前 WAL 文件只存储在一个 BE 上,如果这个 BE 磁盘损坏或文件误删等,可能导入丢失部分数据 - - * 当下线 BE 节点时,请使用[`DECOMMISSION`](../../sql-manual/sql-statements/cluster-management/instance-management/DECOMMISSION-BACKEND)命令,安全下线节点,防止该节点下线前 WAL 文件还没有全部处理完成,导致部分数据丢失 - - * 对于`async_mode`的 Group Commit 写入,为了保护磁盘空间,当遇到以下情况时,会切换成`sync_mode` - - * 导入数据量过大,即超过 WAL 单目录的 80% 空间 - - * 不知道数据量的 chunked stream load - - * 导入数据量不大,但磁盘可用空间不足 - - * 当发生重量级 Schema Change(目前加减列、修改 varchar 长度和重命名列是轻量级 Schema Change,其它的是重量级 Schema Change)时,为了保证 WAL 能够适配表的 Schema,在 Schema Change 最后的 FE 修改元数据阶段,会拒绝 Group Commit 写入,客户端收到 `insert table ${table_name} is blocked on schema change` 异常,客户端重试即可 - ## Group Commit 使用方式 假如表的结构为: @@ -127,30 +84,30 @@ url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionStat * 通过 JDBC url 设置,增加`sessionVariables=group_commit=async_mode` - ``` - url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode&sessionVariables=enable_nereids_planner=false - ``` +``` +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode&sessionVariables=enable_nereids_planner=false +``` * 通过执行 SQL 设置 - ``` - try (Statement statement = conn.createStatement()) { - statement.execute("SET group_commit = async_mode;"); - } - ``` +``` +try (Statement statement = conn.createStatement()) { + statement.execute("SET group_commit = async_mode;"); +} +``` **3. 使用 `PreparedStatement`** ```java private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; -private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode&sessionVariables=enable_nereids_planner=false"; +private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=50$sessionVariables=group_commit=async_mode"; private static final String HOST = "127.0.0.1"; private static final int PORT = 9087; private static final String DB = "db"; private static final String TBL = "dt"; private static final String USER = "root"; private static final String PASSWD = ""; -private static final int INSERT_BATCH_SIZE = 10; +private static final int INSERT_BATCH_SIZE = 10; private static void groupCommitInsertBatch() throws Exception { Class.forName(JDBC_DRIVER); @@ -296,66 +253,66 @@ func logInsertStatistics() { * 异步模式 - ```sql - # 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式 - mysql> set group_commit = async_mode; - - # 这里返回的 label 是 group_commit 开头的,可以区分出是否使用了 group commit - mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); - Query OK, 2 rows affected (0.05 sec) - {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} - - # 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中 - mysql> insert into dt(id, name) values(3, 'John'); - Query OK, 1 row affected (0.01 sec) - {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} - - # 不能立刻查询到 - mysql> select * from dt; - Empty set (0.01 sec) - - # 10 秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。 - mysql> select * from dt; - +------+-------+-------+ - | id | name | score | - +------+-------+-------+ - | 1 | Bob | 90 | - | 2 | Alice | 99 | - | 3 | John | NULL | - +------+-------+-------+ - 3 rows in set (0.02 sec) - ``` +```sql +# 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式 +mysql> set group_commit = async_mode; + +# 这里返回的 label 是 group_commit 开头的,可以区分出是否使用了 group commit +mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); +Query OK, 2 rows affected (0.05 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中 +mysql> insert into dt(id, name) values(3, 'John'); +Query OK, 1 row affected (0.01 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# 不能立刻查询到 +mysql> select * from dt; +Empty set (0.01 sec) + +# 10 秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。 +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | ++------+-------+-------+ +3 rows in set (0.02 sec) +``` * 同步模式 - ```sql - # 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式 - mysql> set group_commit = sync_mode; - - # 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。 - mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); - Query OK, 2 rows affected (10.06 sec) - {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} - - # 数据可以立刻读出 - mysql> select * from dt; - +------+-------+-------+ - | id | name | score | - +------+-------+-------+ - | 1 | Bob | 90 | - | 2 | Alice | 99 | - | 3 | John | NULL | - | 4 | Bob | 90 | - | 5 | Alice | 99 | - +------+-------+-------+ - 5 rows in set (0.03 sec) - ``` +```sql +# 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式 +mysql> set group_commit = sync_mode; + +# 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。 +mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); +Query OK, 2 rows affected (10.06 sec) +{'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} + +# 数据可以立刻读出 +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | +| 4 | Bob | 90 | +| 5 | Alice | 99 | ++------+-------+-------+ +5 rows in set (0.03 sec) +``` * 关闭模式 - ```sql - mysql> set group_commit = off_mode; - ``` +```sql +mysql> set group_commit = off_mode; +``` ### Stream Load @@ -368,66 +325,66 @@ func logInsertStatistics() { * 异步模式 - ```sql - # 导入时在 header 中增加"group_commit:async_mode"配置 - - curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load - { - "TxnId": 7009, - "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", - "Comment": "", - "GroupCommit": true, - "Status": "Success", - "Message": "OK", - "NumberTotalRows": 2, - "NumberLoadedRows": 2, - "NumberFilteredRows": 0, - "NumberUnselectedRows": 0, - "LoadBytes": 19, - "LoadTimeMs": 35, - "StreamLoadPutTimeMs": 5, - "ReadDataTimeMs": 0, - "WriteDataTimeMs": 26 - } +```sql +# 导入时在 header 中增加"group_commit:async_mode"配置 - # 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 - # 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label - ``` +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 7009, + "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 35, + "StreamLoadPutTimeMs": 5, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 26 +} + +# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 +# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label +``` * 同步模式 - ```sql - # 导入时在 header 中增加"group_commit:sync_mode"配置 - - curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load - { - "TxnId": 3009, - "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", - "Comment": "", - "GroupCommit": true, - "Status": "Success", - "Message": "OK", - "NumberTotalRows": 2, - "NumberLoadedRows": 2, - "NumberFilteredRows": 0, - "NumberUnselectedRows": 0, - "LoadBytes": 19, - "LoadTimeMs": 10044, - "StreamLoadPutTimeMs": 4, - "ReadDataTimeMs": 0, - "WriteDataTimeMs": 10038 - } +```sql +# 导入时在 header 中增加"group_commit:sync_mode"配置 + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 3009, + "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 10044, + "StreamLoadPutTimeMs": 4, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 10038 +} - # 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 - # 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label - ``` +# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程 +# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label +``` - 关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](./import-way/stream-load-manual)。 +关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](./import-way/stream-load-manual)。 ## 自动提交条件 -当满足时间间隔 (默认为 10 秒) 或数据量 (默认为 64 MB) 其中一个条件时,会自动提交数据。 +当满足时间间隔 (默认为 10 秒) 或数据量 (默认为 64 MB) 其中一个条件时,会自动提交数据。这两个参数需要配合使用,建议根据实际场景进行调优。 ### 修改提交间隔 @@ -438,6 +395,17 @@ func logInsertStatistics() { ALTER TABLE dt SET ("group_commit_interval_ms" = "2000"); ``` +**参数调整建议**: +- 较短的间隔(如2秒): + - 优点:数据可见性延迟更低,适合对实时性要求较高的场景 + - 缺点:提交次数增多,版本数增长更快,后台compaction压力更大 + +- 较长的间隔(如30秒): + - 优点:提交批次更大,版本数增长更慢,系统开销更小 + - 缺点:数据可见性延迟更高 + +建议根据业务对数据可见性延迟的容忍度来设置,如果系统压力大,可以适当增加间隔。 + ### 修改提交数据量 Group Commit 的默认提交数据量为 64 MB,用户可以通过修改表的配置调整: @@ -447,6 +415,18 @@ Group Commit 的默认提交数据量为 64 MB,用户可以通过修改表的 ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); ``` +**参数调整建议**: +- 较小的阈值(如32MB): + - 优点:内存占用更少,适合资源受限的环境 + - 缺点:提交批次较小,吞吐量可能受限 + +- 较大的阈值(如256MB): + - 优点:批量提交效率更高,系统吞吐量更大 + - 缺点:占用更多内存 + +建议根据系统内存资源和数据可靠性要求来权衡。如果内存充足且追求更高吞吐,可以适当增加到128MB或更大。 + + ## 相关系统配置 ### BE 配置 @@ -467,6 +447,40 @@ ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); * 默认值:10000 +## 使用限制 + +* **Group Commit 限制条件** + + * `INSERT INTO VALUES` 语句在以下情况下会退化为非 Group Commit 方式: + - 使用事务写入 (`Begin; INSERT INTO VALUES; COMMIT`) + - 指定 Label (`INSERT INTO dt WITH LABEL {label} VALUES`) + - VALUES 中包含表达式 (`INSERT INTO dt VALUES (1 + 100)`) + - 列更新写入 + - 表不支持轻量级模式更改 + + * `Stream Load` 在以下情况下会退化为非 Group Commit 方式: + - 使用两阶段提交 + - 指定 Label (`-H "label:my_label"`) + - 列更新写入 + - 表不支持轻量级模式更改 + +* **Unique 模型** + - Group Commit 不保证提交顺序,建议使用 Sequence 列来保证数据一致性。 + +* **max_filter_ratio 支持** + - 默认导入中,`filter_ratio` 通过失败行数和总行数计算。 + - Group Commit 模式下,`max_filter_ratio` 在总行数不超过 `group_commit_memory_rows_for_max_filter_ratio` 时有效。 + +* **WAL 限制** + - `async_mode` 写入会将数据写入 WAL,成功后删除,失败时通过 WAL 恢复。 + - WAL 文件仅存储在一个 BE 上,磁盘损坏或文件丢失可能导致数据丢失。 + - 下线 BE 节点时,使用 `DECOMMISSION` 命令以防数据丢失。 + - `async_mode` 在以下情况下切换为 `sync_mode`: + - 导入数据量过大(超过 WAL 单目录 80% 空间) + - 不知道数据量的 chunked stream load + - 磁盘可用空间不足 + - 重量级 Schema Change 时,拒绝 Group Commit 写入,客户端需重试。 + ## 性能 我们分别测试了使用`Stream Load`和`JDBC`在高并发小数据量场景下`group commit`(使用`async mode`) 的写入性能。 diff --git a/versioned_docs/version-2.1/data-operate/import/group-commit-manual.md b/versioned_docs/version-2.1/data-operate/import/group-commit-manual.md index 520fadb3bb..21e0d1b587 100644 --- a/versioned_docs/version-2.1/data-operate/import/group-commit-manual.md +++ b/versioned_docs/version-2.1/data-operate/import/group-commit-manual.md @@ -24,83 +24,39 @@ specific language governing permissions and limitations under the License. --> -# Group Commit +In high-frequency small batch write scenarios, traditional loading methods have the following issues: -Group commit load does not introduce a new data import method, but an extension of `INSERT INTO tbl VALUS(...)` and `Stream Load`. It is a way to improve the write performance of Doris with high-concurrency and small-data writes. Your application can directly use JDBC to do high-concurrency insert operation into Doris, at the same time, combining PreparedStatement can get even higher performance. In logging scenarios, you can also do high-concurrency Stream Load into Doris. +- Each load creates an independent transaction, requiring FE to parse SQL and generate execution plans, affecting overall performance +- Each load generates a new version, causing rapid version growth and increasing background compaction pressure -## Group Commit Mode +To solve these problems, Doris introduced the Group Commit mechanism. Group Commit is not a new loading method, but rather an optimization extension of existing loading methods, mainly targeting: -Group Commit provides 3 modes: +- `INSERT INTO tbl VALUES(...)` statements +- Stream Load -* `off_mode` +By merging multiple small batch loads into one large transaction commit in the background, it significantly improves high-concurrency small batch write performance. Additionally, using Group Commit with PreparedStatement can achieve even higher performance improvements. -Disable group commit. +## Group Commit Modes -* `sync_mode` +Group Commit has three modes: -Doris groups multiple loads into one transaction commit based on the `group_commit_interval` table property. The load is returned after the transaction commit. This mode is suitable for high-concurrency writing scenarios and requires immediate data visibility after the load is finished. +* Off Mode (`off_mode`) -* `async_mode` + Group Commit is disabled. -Doris writes data to the Write Ahead Log (WAL) firstly, then the load is returned. Doris groups multiple loads into one transaction commit based on the `group_commit_interval` table property, and the data is visible after the commit. To prevent excessive disk space usage by the WAL, it automatically switches to `sync_mode`. This is suitable for latency-sensitive and high-frequency writing. +* Synchronous Mode (`sync_mode`) -The number of WALs can be viewed through the FE HTTP interface, as detailed [here](../../admin-manual/open-api/fe-http/get-wal-size-action). Alternatively, you can search for the keyword `wal` in the BE metrics. + Doris commits multiple loads in one transaction based on load and table's `group_commit_interval` property, returning after transaction commit. This is suitable for high-concurrency write scenarios requiring immediate data visibility after loading. -## Limitations +* Asynchronous Mode (`async_mode`) -* When the group commit is enabled, some `INSERT INTO VALUES` sqls are not executed in the group commit way if they meet the following conditions: + Doris first writes data to WAL (Write Ahead Log), then returns immediately. Doris asynchronously commits data based on load and table's `group_commit_interval` property, making data visible after commit. To prevent WAL from occupying too much disk space, it automatically switches to `sync_mode` for large single loads. This is suitable for write latency-sensitive and high-frequency write scenarios. - * Transaction insert, such as `BEGIN`, `INSERT INTO VALUES`, `COMMIT` + WAL count can be viewed through FE http interface as shown [here](../../admin-manual/open-api/fe-http/get-wal-size-action), or by searching for `wal` keyword in BE metrics. - * Specify the label, such as `INSERT INTO dt WITH LABEL {label} VALUES` +## How to Use Group Commit - * Expressions within VALUES, such as `INSERT INTO dt VALUES (1 + 100)` - - * Column update - - * Tables that do not support light schema changes - -* When the group commit is enabled, some `Stream Load` and `Http Stream` are not executed in the group commit way if they meet the following conditions: - - * Two phase commit - - * Specify the label by set header `-H "label:my_label"` - - * Column update - - * Tables that do not support light schema changes - -* For unique table, because the group commit can not guarantee the commit order, users can use sequence column to ensure the data consistency. - -* The limit of `max_filter_ratio` - - * For non group commit load, filter_ratio is calculated by the failed rows and total rows when load is finished. If the filter_ratio does not match, the transaction will not commit - - * In the group commit mode, multiple user loads are executed by one internal load. The internal load will commit all user loads. - - * Currently, group commit supports a certain degree of max_filter_ratio semantics. When the total number of rows does not exceed group_commit_memory_rows_for_max_filter_ratio (configured in `be.conf`, defaulting to `10000` rows), max_filter_ratio will work. - -* The limit of WAL - - * For async_mode group commit, data is written to the Write Ahead Log (WAL). If the internal load succeeds, the WAL is immediately deleted. If the internal load fails, data is recovered by loading the WAL. - - * Currently, WAL files are stored only on one disk of one BE. If the BE's disk is damaged or the file is mistakenly deleted, it may result in data loss. - - * When decommissioning a BE node, please use the [`DECOMMISSION`](../../../sql-manual/sql-statements/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND) command to safely decommission the node. This prevents potential data loss if the WAL files are not processed before the node is taken offline. - - * For async_mode group commit writes, to protect disk space, it switches to sync_mode under the following conditions: - - * For a load with large amount of data: exceeding 80% of the disk space of a WAL directory. - - * Chunked stream loads with an unknown data amount. - - * Insufficient disk space, even if it is a load with small amount of data. - - * During hard weight schema changes (adding or dropping columns, modifying varchar length, and renaming columns are lightweight schema changes, others are hard weight), to ensure WAL file is compatibility with the table's schema, the final stage of metadata modification in FE will reject group commit writes. Clients get `insert table ${table_name} is blocked on schema change` exception and can retry the load. - -## Basic operations - -If the table schema is: +Assuming the table structure is: ```sql CREATE TABLE `dt` ( `id` int(11) NOT NULL, @@ -114,25 +70,25 @@ PROPERTIES ( ); ``` -### Use `JDBC` +### Using JDBC -To reduce the CPU cost of SQL parsing and query planning, we provide the `PreparedStatement` in the FE. When using `PreparedStatement`, the SQL and its plan will be cached in the session level memory cache and will be reused later on, which reduces the CPU cost of FE. The following is an example of using PreparedStatement in JDBC: +When users write using JDBC's `insert into values` method, to reduce SQL parsing and planning overhead, we support MySQL protocol's `PreparedStatement` feature on the FE side. When using `PreparedStatement`, SQL and its load plan are cached in session-level memory cache, and subsequent loads directly use the cached objects, reducing FE CPU pressure. Here's an example of using `PreparedStatement` in JDBC: -1. Setup JDBC url and enable server side prepared statement +**1. Set JDBC URL and enable Prepared Statement on Server side** ``` url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500 ``` -2. Set `group_commit` session variable, there are two ways to do it: +**2. Configure `group_commit` session variable in one of two ways:** -* Add `sessionVariables=group_commit=async_mode` in JDBC url +* Through JDBC url by adding `sessionVariables=group_commit=async_mode` ``` -url = url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode ``` -* Use `SET group_commit = async_mode;` command +* Through SQL execution ``` try (Statement statement = conn.createStatement()) { @@ -140,11 +96,11 @@ try (Statement statement = conn.createStatement()) { } ``` -3. Using `PreparedStatement` +**3. Use `PreparedStatement`** ```java private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; -private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode"; +private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=50$sessionVariables=group_commit=async_mode"; private static final String HOST = "127.0.0.1"; private static final int PORT = 9087; private static final String DB = "db"; @@ -179,143 +135,143 @@ private static void groupCommitInsertBatch() throws Exception { } ``` -**Note:** Due to the high frequency of `INSERT INTO` statements, a large amount of audit logs might be printed, which could impact overall performance. By default, the audit log for prepared statements is disabled. You can control whether to print the audit log for prepared statements by setting a session variable. +Note: Since high-frequency insert into statements will print large amounts of audit logs affecting final performance, printing prepared statement audit logs is disabled by default. You can control whether to print prepared statement audit logs through session variable settings. ```sql -# Configure the session variable to enable printing the audit log for prepared statements. By default, it is set to false, which disables printing the audit log for prepared statements. +# Configure session variable to enable printing prepared statement audit log, default is false set enable_prepared_stmt_audit_log=true; ``` -For more usage on **JDBC**, refer to [Using Insert to Synchronize Data](./import-way/insert-into-manual). +For more about **JDBC** usage, refer to [Using Insert Method to Synchronize Data](./import-way/insert-into-manual.md). ### Using Golang for Group Commit -Golang has limited support for prepared statements, so we can manually batch the statements on the client side to improve the performance of Group Commit. Below is an example program. +Since Golang has limited support for prepared statements, we can improve Group Commit performance through manual client-side batching. Here's a sample program: ```Golang package main import ( - "database/sql" - "fmt" - "math/rand" - "strings" - "sync" - "sync/atomic" - "time" - - _ "github.com/go-sql-driver/mysql" + "database/sql" + "fmt" + "math/rand" + "strings" + "sync" + "sync/atomic" + "time" + + _ "github.com/go-sql-driver/mysql" ) const ( - host = "127.0.0.1" - port = 9038 - db = "test" - user = "root" - password = "" - table = "async_lineitem" + host = "127.0.0.1" + port = 9038 + db = "test" + user = "root" + password = "" + table = "async_lineitem" ) var ( - threadCount = 20 - batchSize = 100 + threadCount = 20 + batchSize = 100 ) var totalInsertedRows int64 var rowsInsertedLastSecond int64 func main() { - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", user, password, host, port, db) - db, err := sql.Open("mysql", dbDSN) - if err != nil { - fmt.Printf("Error opening database: %s\n", err) - return - } - defer db.Close() - - var wg sync.WaitGroup - for i := 0; i < threadCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - groupCommitInsertBatch(db) - }() - } - - go logInsertStatistics() - - wg.Wait() + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", user, password, host, port, db) + db, err := sql.Open("mysql", dbDSN) + if err != nil { + fmt.Printf("Error opening database: %s\n", err) + return + } + defer db.Close() + + var wg sync.WaitGroup + for i := 0; i < threadCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + groupCommitInsertBatch(db) + }() + } + + go logInsertStatistics() + + wg.Wait() } func groupCommitInsertBatch(db *sql.DB) { - for { - valueStrings := make([]string, 0, batchSize) - valueArgs := make([]interface{}, 0, batchSize*16) - for i := 0; i < batchSize; i++ { - valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, "N") - valueArgs = append(valueArgs, "O") - valueArgs = append(valueArgs, time.Now()) - valueArgs = append(valueArgs, time.Now()) - valueArgs = append(valueArgs, time.Now()) - valueArgs = append(valueArgs, "DELIVER IN PERSON") - valueArgs = append(valueArgs, "SHIP") - valueArgs = append(valueArgs, "N/A") - } - stmt := fmt.Sprintf("INSERT INTO %s VALUES %s", - table, strings.Join(valueStrings, ",")) - _, err := db.Exec(stmt, valueArgs...) - if err != nil { - fmt.Printf("Error executing batch: %s\n", err) - return - } - atomic.AddInt64(&rowsInsertedLastSecond, int64(batchSize)) - atomic.AddInt64(&totalInsertedRows, int64(batchSize)) - } + for { + valueStrings := make([]string, 0, batchSize) + valueArgs := make([]interface{}, 0, batchSize*16) + for i := 0; i < batchSize; i++ { + valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, "N") + valueArgs = append(valueArgs, "O") + valueArgs = append(valueArgs, time.Now()) + valueArgs = append(valueArgs, time.Now()) + valueArgs = append(valueArgs, time.Now()) + valueArgs = append(valueArgs, "DELIVER IN PERSON") + valueArgs = append(valueArgs, "SHIP") + valueArgs = append(valueArgs, "N/A") + } + stmt := fmt.Sprintf("INSERT INTO %s VALUES %s", + table, strings.Join(valueStrings, ",")) + _, err := db.Exec(stmt, valueArgs...) + if err != nil { + fmt.Printf("Error executing batch: %s\n", err) + return + } + atomic.AddInt64(&rowsInsertedLastSecond, int64(batchSize)) + atomic.AddInt64(&totalInsertedRows, int64(batchSize)) + } } func logInsertStatistics() { - for { - time.Sleep(1 * time.Second) - fmt.Printf("Total inserted rows: %d\n", totalInsertedRows) - fmt.Printf("Rows inserted in the last second: %d\n", rowsInsertedLastSecond) - rowsInsertedLastSecond = 0 - } + for { + time.Sleep(1 * time.Second) + fmt.Printf("Total inserted rows: %d\n", totalInsertedRows) + fmt.Printf("Rows inserted in the last second: %d\n", rowsInsertedLastSecond) + rowsInsertedLastSecond = 0 + } } ``` - ### INSERT INTO VALUES -* async_mode +* Asynchronous Mode + ```sql -# Config session variable to enable the async group commit, the default value is off_mode +# Configure session variable to enable group commit (default is off_mode), enable asynchronous mode mysql> set group_commit = async_mode; -# The retured label is start with 'group_commit', which is the label of the real load job +# The returned label is prefixed with group_commit, indicating whether group commit is used mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); Query OK, 2 rows affected (0.05 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -# The returned label and txn_id are the same as the above, which means they are handled in on load job +# The label, txn_id, and previous one are the same, indicating that they are accumulated into the same import task mysql> insert into dt(id, name) values(3, 'John'); Query OK, 1 row affected (0.01 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -# The data is not visible +# Cannot query immediately mysql> select * from dt; Empty set (0.01 sec) -# After about 10 seconds, the data is visible +# 10 seconds later, data can be queried, and data visibility delay can be controlled by table attribute group_commit_interval. mysql> select * from dt; +------+-------+-------+ | id | name | score | @@ -327,18 +283,18 @@ mysql> select * from dt; 3 rows in set (0.02 sec) ``` -* sync_mode +* Synchronous Mode + ```sql -# Config session variable to enable the sync group commit +# Configure session variable to enable group commit (default is off_mode), enable synchronous mode mysql> set group_commit = sync_mode; -# The retured label is start with 'group_commit', which is the label of the real load job. -# The insert costs at least the group_commit_interval_ms of table property. +# The returned label is prefixed with group_commit, indicating whether group commit is used, and import time is at least table attribute group_commit_interval. mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); Query OK, 2 rows affected (10.06 sec) {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} -# The data is visible after the insert is returned +# Data can be read immediately mysql> select * from dt; +------+-------+-------+ | id | name | score | @@ -352,22 +308,25 @@ mysql> select * from dt; 5 rows in set (0.03 sec) ``` -* off_mode +* Off Mode + ```sql mysql> set group_commit = off_mode; ``` ### Stream Load -If the content of `data.csv` is: +Assuming `data.csv` contains: + ```sql 6,Amy,60 7,Ross,98 ``` -* async_mode +* Asynchronous Mode + ```sql -# Add 'group_commit:async_mode' configuration in the http header +# Import with "group_commit:async_mode" configuration in header curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load { @@ -388,13 +347,14 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mo "WriteDataTimeMs": 26 } -# The returned 'GroupCommit' is 'true', which means this is a group commit load -# The retured label is start with 'group_commit', which is the label of the real load job +# The returned GroupCommit is true, indicating that the group commit process is entered +# The returned Label is prefixed with group_commit, indicating the label associated with the import that truly consumes data ``` -* sync_mode +* Synchronous Mode + ```sql -# Add 'group_commit:sync_mode' configuration in the http header +# Import with "group_commit:sync_mode" configuration in header curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load { @@ -415,50 +375,107 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mod "WriteDataTimeMs": 10038 } -# The returned 'GroupCommit' is 'true', which means this is a group commit load -# The retured label is start with 'group_commit', which is the label of the real load job +# The returned GroupCommit is true, indicating that the group commit process is entered +# The returned Label is prefixed with group_commit, indicating the label associated with the import that truly consumes data ``` -See [Stream Load](./import-way/stream-load-manual.md) for more detailed syntax used by **Stream Load**. +About Stream Load usage, please refer to [Stream Load](./import-way/stream-load-manual). -## Group commit condition -The data will be automatically committed either when the time interval (default is 10 seconds) or the data size (default is 64 MB) conditions meet. +Data is automatically committed when either time interval (default 10 seconds) or data volume (default 64 MB) condition is met. These parameters should be used together and tuned based on actual scenarios. -### Modify the time interval condition +### Modifying Commit Interval -The default group commit interval is 10 seconds. Users can modify the configuration of the table: +Default commit interval is 10 seconds, users can adjust through table configuration: ```sql -# Modify the group commit interval to 2 seconds +# Modify commit interval to 2 seconds ALTER TABLE dt SET ("group_commit_interval_ms" = "2000"); ``` -### Modify the data size condition +**Parameter Adjustment Recommendations**: +- Shorter intervals (e.g., 2 seconds): + - Pros: Lower data visibility latency, suitable for scenarios requiring high real-time performance + - Cons: More commits, faster version growth, higher background compaction pressure + +- Longer intervals (e.g., 30 seconds): + - Pros: Larger commit batches, slower version growth, lower system overhead + - Cons: Higher data visibility latency + +Recommend setting based on business tolerance for data visibility delay. If system pressure is high, consider increasing the interval. -The default group commit data size is 64 MB. Users can modify the configuration of the table: +### Modifying Commit Data Volume + +Group Commit's default commit data volume is 64 MB, users can adjust through table configuration: ```sql -# Modify the group commit data size to 128MB +# Modify commit data volume to 128MB ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); ``` -## Relevant system configuration +**Parameter Adjustment Recommendations**: +- Smaller threshold (e.g., 32MB): + - Pros: Less memory usage, suitable for resource-constrained environments + - Cons: Smaller commit batches, potentially limited throughput + +- Larger threshold (e.g., 256MB): + - Pros: Higher batch commit efficiency, greater system throughput + - Cons: Uses more memory + +Recommend balancing based on system memory resources and data reliability requirements. If memory is sufficient and higher throughput is desired, consider increasing to 128MB or more. + + +### BE Configuration + +1. `group_commit_wal_path` + + * Description: Directory for storing group commit WAL files + + * Default: Creates a `wal` directory under each configured `storage_root_path`. Configuration example: + + ``` + group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal + ``` + +2. `group_commit_memory_rows_for_max_filter_ratio` + + * Description: `max_filter_ratio` works normally when group commit load total rows don't exceed this value, otherwise it doesn't work + + * Default: 10000 + +## Usage Limitations + +* **Group Commit Limitations** -### BE configuration + * `INSERT INTO VALUES` statements degrade to non-Group Commit mode in these cases: + - Transaction writes (`Begin; INSERT INTO VALUES; COMMIT`) + - Specified Label (`INSERT INTO dt WITH LABEL {label} VALUES`) + - VALUES containing expressions (`INSERT INTO dt VALUES (1 + 100)`) + - Column update writes + - Table doesn't support lightweight mode changes -#### `group_commit_wal_path` + * `Stream Load` degrades to non-Group Commit mode in these cases: + - Using two-phase commit + - Specified Label (`-H "label:my_label"`) + - Column update writes + - Table doesn't support lightweight mode changes -* The `WAL` directory of group commit. -* Default: A directory named `wal` is created under each directory of the `storage_root_path`. Configuration examples: - ``` - group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal - ``` +* **Unique Model** + - Group Commit doesn't guarantee commit order, recommend using Sequence column to ensure data consistency. -#### `group_commit_memory_rows_for_max_filter_ratio` +* **max_filter_ratio Support** + - In default loads, `filter_ratio` is calculated through failed rows and total rows. + - In Group Commit mode, `max_filter_ratio` works when total rows don't exceed `group_commit_memory_rows_for_max_filter_ratio`. -* Description: The `max_filter_ratio` limit can only work if the total rows of `group commit` is less than this value. -* Default: 10000 +* **WAL Limitations** + - `async_mode` writes data to WAL, deletes after success, recovers through WAL on failure. + - WAL files are stored on only one BE, disk damage or file loss may cause data loss. + - When offlining BE nodes, use `DECOMMISSION` command to prevent data loss. + - `async_mode` switches to `sync_mode` in these cases: + - Load data volume too large (exceeds 80% of WAL single directory space) + - Unknown data volume chunked stream load + - Insufficient disk space + - During heavyweight Schema Change, Group Commit writes are rejected, client needs to retry. ## Performance diff --git a/versioned_docs/version-3.0/data-operate/import/group-commit-manual.md b/versioned_docs/version-3.0/data-operate/import/group-commit-manual.md index 0ce973b0d8..ba775dba8b 100644 --- a/versioned_docs/version-3.0/data-operate/import/group-commit-manual.md +++ b/versioned_docs/version-3.0/data-operate/import/group-commit-manual.md @@ -24,84 +24,39 @@ specific language governing permissions and limitations under the License. --> -# Group Commit +In high-frequency small batch write scenarios, traditional loading methods have the following issues: -Group commit load does not introduce a new data import method, but an extension of `INSERT INTO tbl VALUS(...)` and `Stream Load`. It is a way to improve the write performance of Doris with high-concurrency and small-data writes. Your application can directly use JDBC to do high-concurrency insert operation into Doris, at the same time, combining PreparedStatement can get even higher performance. In logging scenarios, you can also do high-concurrency Stream Load into Doris. +- Each load creates an independent transaction, requiring FE to parse SQL and generate execution plans, affecting overall performance +- Each load generates a new version, causing rapid version growth and increasing background compaction pressure -## Group Commit Mode +To solve these problems, Doris introduced the Group Commit mechanism. Group Commit is not a new loading method, but rather an optimization extension of existing loading methods, mainly targeting: -Group Commit provides 3 modes: +- `INSERT INTO tbl VALUES(...)` statements +- Stream Load -* `off_mode` +By merging multiple small batch loads into one large transaction commit in the background, it significantly improves high-concurrency small batch write performance. Additionally, using Group Commit with PreparedStatement can achieve even higher performance improvements. -Disable group commit. +## Group Commit Modes -* `sync_mode` +Group Commit has three modes: -Doris groups multiple loads into one transaction commit based on the `group_commit_interval` table property. The load is returned after the transaction commit. This mode is suitable for high-concurrency writing scenarios and requires immediate data visibility after the load is finished. +* Off Mode (`off_mode`) -* `async_mode` + Group Commit is disabled. -Doris writes data to the Write Ahead Log (WAL) firstly, then the load is returned. Doris groups multiple loads into one transaction commit based on the `group_commit_interval` table property, and the data is visible after the commit. To prevent excessive disk space usage by the WAL, it automatically switches to `sync_mode`. This is suitable for latency-sensitive and high-frequency writing. +* Synchronous Mode (`sync_mode`) -The number of WALs can be viewed through the FE HTTP interface, as detailed [here](../../admin-manual/open-api/fe-http/get-wal-size-action). Alternatively, you can search for the keyword `wal` in the BE metrics. + Doris commits multiple loads in one transaction based on load and table's `group_commit_interval` property, returning after transaction commit. This is suitable for high-concurrency write scenarios requiring immediate data visibility after loading. -## Limitations +* Asynchronous Mode (`async_mode`) -* When the group commit is enabled, some `INSERT INTO VALUES` sqls are not executed in the group commit way if they meet the following conditions: + Doris first writes data to WAL (Write Ahead Log), then returns immediately. Doris asynchronously commits data based on load and table's `group_commit_interval` property, making data visible after commit. To prevent WAL from occupying too much disk space, it automatically switches to `sync_mode` for large single loads. This is suitable for write latency-sensitive and high-frequency write scenarios. - * Transaction insert, such as `BEGIN`, `INSERT INTO VALUES`, `COMMIT` + WAL count can be viewed through FE http interface as shown [here](../../admin-manual/open-api/fe-http/get-wal-size-action), or by searching for `wal` keyword in BE metrics. - * Specify the label, such as `INSERT INTO dt WITH LABEL {label} VALUES` +## How to Use Group Commit - * Expressions within VALUES, such as `INSERT INTO dt VALUES (1 + 100)` - - * Column update - - * Tables that do not support light schema changes - -* When the group commit is enabled, some `Stream Load` and `Http Stream` are not executed in the group commit way if they meet the following conditions: - - * Two phase commit - - * Specify the label by set header `-H "label:my_label"` - - * Column update - - * Tables that do not support light schema changes - -* For unique table, because the group commit can not guarantee the commit order, users can use sequence column to ensure the data consistency. - -* The limit of `max_filter_ratio` - - * For non group commit load, filter_ratio is calculated by the failed rows and total rows when load is finished. If the filter_ratio does not match, the transaction will not commit - - * In the group commit mode, multiple user loads are executed by one internal load. The internal load will commit all user loads. - - * Currently, group commit supports a certain degree of max_filter_ratio semantics. When the total number of rows does not exceed group_commit_memory_rows_for_max_filter_ratio (configured in `be.conf`, defaulting to `10000` rows), max_filter_ratio will work. - -* The limit of WAL - - * For async_mode group commit, data is written to the Write Ahead Log (WAL). If the internal load succeeds, the WAL is immediately deleted. If the internal load fails, data is recovered by loading the WAL. - - * Currently, WAL files are stored only on one disk of one BE. If the BE's disk is damaged or the file is mistakenly deleted, it may result in data loss. - - * When decommissioning a BE node, please use the [`DECOMMISSION`](../../../sql-manual/sql-statements/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND) command to safely decommission the node. This prevents potential data loss if the WAL files are not processed before the node is taken offline. - - * For async_mode group commit writes, to protect disk space, it switches to sync_mode under the following conditions: - - * For a load with large amount of data: exceeding 80% of the disk space of a WAL directory. - - * Chunked stream loads with an unknown data amount. - - * Insufficient disk space, even if it is a load with small amount of data. - - * During hard weight schema changes (adding or dropping columns, modifying varchar length, and renaming columns are lightweight schema changes, others are hard weight), to ensure WAL file is compatibility with the table's schema, the final stage of metadata modification in FE will reject group commit writes. Clients get `insert table ${table_name} is blocked on schema change` exception and can retry the load. - - -## Basic operations - -If the table schema is: +Assuming the table structure is: ```sql CREATE TABLE `dt` ( `id` int(11) NOT NULL, @@ -115,25 +70,25 @@ PROPERTIES ( ); ``` -### Use `JDBC` +### Using JDBC -To reduce the CPU cost of SQL parsing and query planning, we provide the `PreparedStatement` in the FE. When using `PreparedStatement`, the SQL and its plan will be cached in the session level memory cache and will be reused later on, which reduces the CPU cost of FE. The following is an example of using PreparedStatement in JDBC: +When users write using JDBC's `insert into values` method, to reduce SQL parsing and planning overhead, we support MySQL protocol's `PreparedStatement` feature on the FE side. When using `PreparedStatement`, SQL and its load plan are cached in session-level memory cache, and subsequent loads directly use the cached objects, reducing FE CPU pressure. Here's an example of using `PreparedStatement` in JDBC: -1. Setup JDBC url and enable server side prepared statement +**1. Set JDBC URL and enable Prepared Statement on Server side** ``` url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500 ``` -2. Set `group_commit` session variable, there are two ways to do it: +**2. Configure `group_commit` session variable in one of two ways:** -* Add `sessionVariables=group_commit=async_mode` in JDBC url +* Through JDBC url by adding `sessionVariables=group_commit=async_mode` ``` -url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode&sessionVariables=enable_nereids_planner=false ``` -* Use `SET group_commit = async_mode;` command +* Through SQL execution ``` try (Statement statement = conn.createStatement()) { @@ -141,11 +96,11 @@ try (Statement statement = conn.createStatement()) { } ``` -3. Using `PreparedStatement` +**3. Use `PreparedStatement`** ```java private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; -private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode"; +private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=50$sessionVariables=group_commit=async_mode"; private static final String HOST = "127.0.0.1"; private static final int PORT = 9087; private static final String DB = "db"; @@ -180,143 +135,143 @@ private static void groupCommitInsertBatch() throws Exception { } ``` -**Note:** Due to the high frequency of `INSERT INTO` statements, a large amount of audit logs might be printed, which could impact overall performance. By default, the audit log for prepared statements is disabled. You can control whether to print the audit log for prepared statements by setting a session variable. +Note: Since high-frequency insert into statements will print large amounts of audit logs affecting final performance, printing prepared statement audit logs is disabled by default. You can control whether to print prepared statement audit logs through session variable settings. ```sql -# Configure the session variable to enable printing the audit log for prepared statements. By default, it is set to false, which disables printing the audit log for prepared statements. +# Configure session variable to enable printing prepared statement audit log, default is false set enable_prepared_stmt_audit_log=true; ``` -For more usage on **JDBC**, refer to [Using Insert to Synchronize Data](./import-way/insert-into-manual). +For more about **JDBC** usage, refer to [Using Insert Method to Synchronize Data](./import-way/insert-into-manual.md). ### Using Golang for Group Commit -Golang has limited support for prepared statements, so we can manually batch the statements on the client side to improve the performance of Group Commit. Below is an example program. +Since Golang has limited support for prepared statements, we can improve Group Commit performance through manual client-side batching. Here's a sample program: ```Golang package main import ( - "database/sql" - "fmt" - "math/rand" - "strings" - "sync" - "sync/atomic" - "time" - - _ "github.com/go-sql-driver/mysql" + "database/sql" + "fmt" + "math/rand" + "strings" + "sync" + "sync/atomic" + "time" + + _ "github.com/go-sql-driver/mysql" ) const ( - host = "127.0.0.1" - port = 9038 - db = "test" - user = "root" - password = "" - table = "async_lineitem" + host = "127.0.0.1" + port = 9038 + db = "test" + user = "root" + password = "" + table = "async_lineitem" ) var ( - threadCount = 20 - batchSize = 100 + threadCount = 20 + batchSize = 100 ) var totalInsertedRows int64 var rowsInsertedLastSecond int64 func main() { - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", user, password, host, port, db) - db, err := sql.Open("mysql", dbDSN) - if err != nil { - fmt.Printf("Error opening database: %s\n", err) - return - } - defer db.Close() - - var wg sync.WaitGroup - for i := 0; i < threadCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - groupCommitInsertBatch(db) - }() - } - - go logInsertStatistics() - - wg.Wait() + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", user, password, host, port, db) + db, err := sql.Open("mysql", dbDSN) + if err != nil { + fmt.Printf("Error opening database: %s\n", err) + return + } + defer db.Close() + + var wg sync.WaitGroup + for i := 0; i < threadCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + groupCommitInsertBatch(db) + }() + } + + go logInsertStatistics() + + wg.Wait() } func groupCommitInsertBatch(db *sql.DB) { - for { - valueStrings := make([]string, 0, batchSize) - valueArgs := make([]interface{}, 0, batchSize*16) - for i := 0; i < batchSize; i++ { - valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, rand.Intn(1000)) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) - valueArgs = append(valueArgs, "N") - valueArgs = append(valueArgs, "O") - valueArgs = append(valueArgs, time.Now()) - valueArgs = append(valueArgs, time.Now()) - valueArgs = append(valueArgs, time.Now()) - valueArgs = append(valueArgs, "DELIVER IN PERSON") - valueArgs = append(valueArgs, "SHIP") - valueArgs = append(valueArgs, "N/A") - } - stmt := fmt.Sprintf("INSERT INTO %s VALUES %s", - table, strings.Join(valueStrings, ",")) - _, err := db.Exec(stmt, valueArgs...) - if err != nil { - fmt.Printf("Error executing batch: %s\n", err) - return - } - atomic.AddInt64(&rowsInsertedLastSecond, int64(batchSize)) - atomic.AddInt64(&totalInsertedRows, int64(batchSize)) - } + for { + valueStrings := make([]string, 0, batchSize) + valueArgs := make([]interface{}, 0, batchSize*16) + for i := 0; i < batchSize; i++ { + valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, rand.Intn(1000)) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true}) + valueArgs = append(valueArgs, "N") + valueArgs = append(valueArgs, "O") + valueArgs = append(valueArgs, time.Now()) + valueArgs = append(valueArgs, time.Now()) + valueArgs = append(valueArgs, time.Now()) + valueArgs = append(valueArgs, "DELIVER IN PERSON") + valueArgs = append(valueArgs, "SHIP") + valueArgs = append(valueArgs, "N/A") + } + stmt := fmt.Sprintf("INSERT INTO %s VALUES %s", + table, strings.Join(valueStrings, ",")) + _, err := db.Exec(stmt, valueArgs...) + if err != nil { + fmt.Printf("Error executing batch: %s\n", err) + return + } + atomic.AddInt64(&rowsInsertedLastSecond, int64(batchSize)) + atomic.AddInt64(&totalInsertedRows, int64(batchSize)) + } } func logInsertStatistics() { - for { - time.Sleep(1 * time.Second) - fmt.Printf("Total inserted rows: %d\n", totalInsertedRows) - fmt.Printf("Rows inserted in the last second: %d\n", rowsInsertedLastSecond) - rowsInsertedLastSecond = 0 - } + for { + time.Sleep(1 * time.Second) + fmt.Printf("Total inserted rows: %d\n", totalInsertedRows) + fmt.Printf("Rows inserted in the last second: %d\n", rowsInsertedLastSecond) + rowsInsertedLastSecond = 0 + } } ``` - ### INSERT INTO VALUES -* async_mode +* Asynchronous Mode + ```sql -# Config session variable to enable the async group commit, the default value is off_mode +# Configure session variable to enable group commit (default is off_mode), enable asynchronous mode mysql> set group_commit = async_mode; -# The retured label is start with 'group_commit', which is the label of the real load job +# The returned label is prefixed with group_commit, indicating whether group commit is used mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); Query OK, 2 rows affected (0.05 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -# The returned label and txn_id are the same as the above, which means they are handled in on load job +# The label, txn_id, and previous one are the same, indicating that they are accumulated into the same import task mysql> insert into dt(id, name) values(3, 'John'); Query OK, 1 row affected (0.01 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -# The data is not visible +# Cannot query immediately mysql> select * from dt; Empty set (0.01 sec) -# After about 10 seconds, the data is visible +# 10 seconds later, data can be queried, and data visibility delay can be controlled by table attribute group_commit_interval. mysql> select * from dt; +------+-------+-------+ | id | name | score | @@ -328,18 +283,18 @@ mysql> select * from dt; 3 rows in set (0.02 sec) ``` -* sync_mode +* Synchronous Mode + ```sql -# Config session variable to enable the sync group commit +# Configure session variable to enable group commit (default is off_mode), enable synchronous mode mysql> set group_commit = sync_mode; -# The retured label is start with 'group_commit', which is the label of the real load job. -# The insert costs at least the group_commit_interval_ms of table property. +# The returned label is prefixed with group_commit, indicating whether group commit is used, and import time is at least table attribute group_commit_interval. mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); Query OK, 2 rows affected (10.06 sec) {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} -# The data is visible after the insert is returned +# Data can be read immediately mysql> select * from dt; +------+-------+-------+ | id | name | score | @@ -353,22 +308,25 @@ mysql> select * from dt; 5 rows in set (0.03 sec) ``` -* off_mode +* Off Mode + ```sql mysql> set group_commit = off_mode; ``` ### Stream Load -If the content of `data.csv` is: +Assuming `data.csv` contains: + ```sql 6,Amy,60 7,Ross,98 ``` -* async_mode +* Asynchronous Mode + ```sql -# Add 'group_commit:async_mode' configuration in the http header +# Import with "group_commit:async_mode" configuration in header curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load { @@ -389,13 +347,14 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mo "WriteDataTimeMs": 26 } -# The returned 'GroupCommit' is 'true', which means this is a group commit load -# The retured label is start with 'group_commit', which is the label of the real load job +# The returned GroupCommit is true, indicating that the group commit process is entered +# The returned Label is prefixed with group_commit, indicating the label associated with the import that truly consumes data ``` -* sync_mode +* Synchronous Mode + ```sql -# Add 'group_commit:sync_mode' configuration in the http header +# Import with "group_commit:sync_mode" configuration in header curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load { @@ -416,50 +375,107 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mod "WriteDataTimeMs": 10038 } -# The returned 'GroupCommit' is 'true', which means this is a group commit load -# The retured label is start with 'group_commit', which is the label of the real load job +# The returned GroupCommit is true, indicating that the group commit process is entered +# The returned Label is prefixed with group_commit, indicating the label associated with the import that truly consumes data ``` -See [Stream Load](./import-way/stream-load-manual.md) for more detailed syntax used by **Stream Load**. +About Stream Load usage, please refer to [Stream Load](./import-way/stream-load-manual). -## Group commit condition -The data will be automatically committed either when the time interval (default is 10 seconds) or the data size (default is 64 MB) conditions meet. +Data is automatically committed when either time interval (default 10 seconds) or data volume (default 64 MB) condition is met. These parameters should be used together and tuned based on actual scenarios. -### Modify the time interval condition +### Modifying Commit Interval -The default group commit interval is 10 seconds. Users can modify the configuration of the table: +Default commit interval is 10 seconds, users can adjust through table configuration: ```sql -# Modify the group commit interval to 2 seconds +# Modify commit interval to 2 seconds ALTER TABLE dt SET ("group_commit_interval_ms" = "2000"); ``` -### Modify the data size condition +**Parameter Adjustment Recommendations**: +- Shorter intervals (e.g., 2 seconds): + - Pros: Lower data visibility latency, suitable for scenarios requiring high real-time performance + - Cons: More commits, faster version growth, higher background compaction pressure + +- Longer intervals (e.g., 30 seconds): + - Pros: Larger commit batches, slower version growth, lower system overhead + - Cons: Higher data visibility latency + +Recommend setting based on business tolerance for data visibility delay. If system pressure is high, consider increasing the interval. -The default group commit data size is 64 MB. Users can modify the configuration of the table: +### Modifying Commit Data Volume + +Group Commit's default commit data volume is 64 MB, users can adjust through table configuration: ```sql -# Modify the group commit data size to 128MB +# Modify commit data volume to 128MB ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728"); ``` -## Relevant system configuration +**Parameter Adjustment Recommendations**: +- Smaller threshold (e.g., 32MB): + - Pros: Less memory usage, suitable for resource-constrained environments + - Cons: Smaller commit batches, potentially limited throughput + +- Larger threshold (e.g., 256MB): + - Pros: Higher batch commit efficiency, greater system throughput + - Cons: Uses more memory + +Recommend balancing based on system memory resources and data reliability requirements. If memory is sufficient and higher throughput is desired, consider increasing to 128MB or more. + + +### BE Configuration + +1. `group_commit_wal_path` + + * Description: Directory for storing group commit WAL files + + * Default: Creates a `wal` directory under each configured `storage_root_path`. Configuration example: + + ``` + group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal + ``` + +2. `group_commit_memory_rows_for_max_filter_ratio` + + * Description: `max_filter_ratio` works normally when group commit load total rows don't exceed this value, otherwise it doesn't work + + * Default: 10000 + +## Usage Limitations + +* **Group Commit Limitations** -### BE configuration + * `INSERT INTO VALUES` statements degrade to non-Group Commit mode in these cases: + - Transaction writes (`Begin; INSERT INTO VALUES; COMMIT`) + - Specified Label (`INSERT INTO dt WITH LABEL {label} VALUES`) + - VALUES containing expressions (`INSERT INTO dt VALUES (1 + 100)`) + - Column update writes + - Table doesn't support lightweight mode changes -#### `group_commit_wal_path` + * `Stream Load` degrades to non-Group Commit mode in these cases: + - Using two-phase commit + - Specified Label (`-H "label:my_label"`) + - Column update writes + - Table doesn't support lightweight mode changes -* The `WAL` directory of group commit. -* Default: A directory named `wal` is created under each directory of the `storage_root_path`. Configuration examples: - ``` - group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal - ``` +* **Unique Model** + - Group Commit doesn't guarantee commit order, recommend using Sequence column to ensure data consistency. -#### `group_commit_memory_rows_for_max_filter_ratio` +* **max_filter_ratio Support** + - In default loads, `filter_ratio` is calculated through failed rows and total rows. + - In Group Commit mode, `max_filter_ratio` works when total rows don't exceed `group_commit_memory_rows_for_max_filter_ratio`. -* Description: The `max_filter_ratio` limit can only work if the total rows of `group commit` is less than this value. -* Default: 10000 +* **WAL Limitations** + - `async_mode` writes data to WAL, deletes after success, recovers through WAL on failure. + - WAL files are stored on only one BE, disk damage or file loss may cause data loss. + - When offlining BE nodes, use `DECOMMISSION` command to prevent data loss. + - `async_mode` switches to `sync_mode` in these cases: + - Load data volume too large (exceeds 80% of WAL single directory space) + - Unknown data volume chunked stream load + - Insufficient disk space + - During heavyweight Schema Change, Group Commit writes are rejected, client needs to retry. ## Performance --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org