This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 605257ccb77 [Enhancement](group commit) Add regression case for wal limit (#27949) 605257ccb77 is described below commit 605257ccb771d468c74d407369b572dbe2a04214 Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Wed Dec 6 14:23:50 2023 +0800 [Enhancement](group commit) Add regression case for wal limit (#27949) --- be/src/http/action/http_stream.cpp | 14 +- be/src/http/action/stream_load.cpp | 13 +- be/src/http/utils.cpp | 4 +- .../stream_load/test_group_commit_wal_limit.csv.gz | Bin 0 -> 32 bytes .../stream_load/test_group_commit_wal_limit.groovy | 148 +++++++++++++++++++++ 5 files changed, 172 insertions(+), 7 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index e99e6bcde9f..1026433bed4 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -167,17 +167,27 @@ int HttpStreamAction::on_header(HttpRequest* req) { ctx->load_type = TLoadType::MANUL_LOAD; ctx->load_src_type = TLoadSourceType::RAW; + Status st = Status::OK(); if (iequal(req->header(HTTP_GROUP_COMMIT), "true") || config::wait_internal_group_commit_finish) { ctx->group_commit = load_size_smaller_than_wal_limit(req); + if (!ctx->group_commit) { + LOG(WARNING) << "The data size for this http load(" + << std::stol(req->header(HttpHeaders::CONTENT_LENGTH)) + << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" + << config::wal_max_disk_size * 0.8 + << " Bytes). Please set this load to \"group commit\"=false."; + st = Status::InternalError("Http load size too large."); + } } ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; LOG(INFO) << "new income streaming load request." << ctx->brief() << " sql : " << req->header(HTTP_SQL); - - auto st = _on_header(req, ctx); + if (st.ok()) { + st = _on_header(req, ctx); + } if (!st.ok()) { ctx->status = std::move(st); if (ctx->body_sink != nullptr) { diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index d93332b6dff..cac484a3cf7 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -194,11 +194,18 @@ int StreamLoadAction::on_header(HttpRequest* req) { st = Status::InternalError("label and group_commit can't be set at the same time"); } ctx->group_commit = load_size_smaller_than_wal_limit(req); - } else { - if (ctx->label.empty()) { - ctx->label = generate_uuid_string(); + if (!ctx->group_commit) { + LOG(WARNING) << "The data size for this stream load(" + << std::stol(req->header(HttpHeaders::CONTENT_LENGTH)) + << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" + << config::wal_max_disk_size * 0.8 + << " Bytes). Please set this load to \"group commit\"=false."; + st = Status::InternalError("Stream load size too large."); } } + if (!ctx->group_commit && ctx->label.empty()) { + ctx->label = generate_uuid_string(); + } ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index bfef46036b7..0d66887663f 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -191,7 +191,7 @@ void do_dir_response(const std::string& dir_path, HttpRequest* req) { } bool load_size_smaller_than_wal_limit(HttpRequest* req) { - // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload content length. If it is empty, it means this streamload + // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload content length. If it is empty or equels to 0, it means this streamload // is a chunked streamload and we are not sure its size. // 2. if streamload content length is too large, like larger than 80% of the WAL constrain. // @@ -200,7 +200,7 @@ bool load_size_smaller_than_wal_limit(HttpRequest* req) { if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) { size_t body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); // TODO(Yukang): change it to WalManager::wal_limit - return !(body_bytes > config::wal_max_disk_size * 0.8); + return (body_bytes <= config::wal_max_disk_size * 0.8) && (body_bytes != 0); } else { return false; } diff --git a/regression-test/data/load_p0/stream_load/test_group_commit_wal_limit.csv.gz b/regression-test/data/load_p0/stream_load/test_group_commit_wal_limit.csv.gz new file mode 100644 index 00000000000..64a354ca764 Binary files /dev/null and b/regression-test/data/load_p0/stream_load/test_group_commit_wal_limit.csv.gz differ diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy new file mode 100644 index 00000000000..68ccc3f8538 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("test_group_commit_wal_limit") { + def db= "regression_test_load_p0_stream_load" + def tableName = "test_group_commit_wal_limit" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + k bigint, + v string + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) BUCKETS 32 + PROPERTIES( + "replication_num" = "1" + ); + """ + // streamload + // normal case + StringBuilder strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " ) + strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " ) + strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") + strBuilder.append(" http://" + context.config.feHttpAddress + "/api/${db}/${tableName}/_stream_load") + + String command = strBuilder.toString() + logger.info("command is " + command) + def process = ['bash','-c',command].execute() + def code = process.waitFor() + assertEquals(code, 0) + def out = process.text + logger.info("out is " + out ) + assertTrue(out.contains('group_commit')) + + // chunked data case + strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " ) + strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " ) + strBuilder.append(" -H \"Content-Length:0\" " ) + strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") + strBuilder.append(" http://" + context.config.feHttpAddress + "/api/${db}/${tableName}/_stream_load") + + command = strBuilder.toString() + logger.info("command is " + command) + process = ['bash','-c',command].execute() + code = process.waitFor() + assertEquals(code, 0) + out = process.text + logger.info("out is " + out ) + assertTrue(out.contains('[INTERNAL_ERROR]Stream load size too large')) + + // too lagre data case 1TB + strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " ) + strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " ) + strBuilder.append(" -H \"Content-Length:1099511627776\" " ) + strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") + strBuilder.append(" http://" + context.config.feHttpAddress + "/api/${db}/${tableName}/_stream_load") + + command = strBuilder.toString() + logger.info("command is " + command) + process = ['bash','-c',command].execute() + code = process.waitFor() + assertEquals(code, 0) + out = process.text + logger.info("out is " + out ) + assertTrue(out.contains('[INTERNAL_ERROR]Stream load size too large')) + + // httpload + // normal case + strBuilder = new StringBuilder() + strBuilder.append("curl -v --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + String sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = \\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" " + strBuilder.append(sql) + strBuilder.append(" -H \"group_commit:true\"") + strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") + strBuilder.append(" http://" + context.config.feHttpAddress + "/api/_http_stream") + + command = strBuilder.toString() + logger.info("command is " + command) + process = ['bash','-c',command].execute() + code = process.waitFor() + assertEquals(code, 0) + out = process.text + logger.info("out is " + out ) + assertTrue(out.contains('group_commit')) + + // chunked data case + strBuilder = new StringBuilder() + strBuilder.append("curl -v --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = \\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" " + strBuilder.append(sql) + strBuilder.append(" -H \"group_commit:true\" -H \"Content-Length:0\"") + strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") + strBuilder.append(" http://" + context.config.feHttpAddress + "/api/_http_stream") + + command = strBuilder.toString() + logger.info("command is " + command) + process = ['bash','-c',command].execute() + code = process.waitFor() + assertEquals(code, 0) + out = process.text + logger.info("out is " + out ) + assertTrue(out.contains('[INTERNAL_ERROR]Http load size too large')) + + // too lagre data case 1TB + strBuilder = new StringBuilder() + strBuilder.append("curl -v --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = \\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" " + strBuilder.append(sql) + strBuilder.append(" -H \"group_commit:true\" -H \"Content-Length:1099511627776\"") + strBuilder.append(" -T " + context.config.dataPath + "/load_p0/stream_load/test_group_commit_wal_limit.csv.gz") + strBuilder.append(" http://" + context.config.feHttpAddress + "/api/_http_stream") + + command = strBuilder.toString() + logger.info("command is " + command) + process = ['bash','-c',command].execute() + code = process.waitFor() + assertEquals(code, 0) + out = process.text + logger.info("out is " + out ) + assertTrue(out.contains('[INTERNAL_ERROR]Http load size too large')) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org