This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 605257ccb77 [Enhancement](group commit) Add regression case for wal 
limit (#27949)
605257ccb77 is described below

commit 605257ccb771d468c74d407369b572dbe2a04214
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Wed Dec 6 14:23:50 2023 +0800

    [Enhancement](group commit) Add regression case for wal limit (#27949)
---
 be/src/http/action/http_stream.cpp                 |  14 +-
 be/src/http/action/stream_load.cpp                 |  13 +-
 be/src/http/utils.cpp                              |   4 +-
 .../stream_load/test_group_commit_wal_limit.csv.gz | Bin 0 -> 32 bytes
 .../stream_load/test_group_commit_wal_limit.groovy | 148 +++++++++++++++++++++
 5 files changed, 172 insertions(+), 7 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index e99e6bcde9f..1026433bed4 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -167,17 +167,27 @@ int HttpStreamAction::on_header(HttpRequest* req) {
     ctx->load_type = TLoadType::MANUL_LOAD;
     ctx->load_src_type = TLoadSourceType::RAW;
 
+    Status st = Status::OK();
     if (iequal(req->header(HTTP_GROUP_COMMIT), "true") ||
         config::wait_internal_group_commit_finish) {
         ctx->group_commit = load_size_smaller_than_wal_limit(req);
+        if (!ctx->group_commit) {
+            LOG(WARNING) << "The data size for this http load("
+                         << std::stol(req->header(HttpHeaders::CONTENT_LENGTH))
+                         << " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
+                         << config::wal_max_disk_size * 0.8
+                         << " Bytes). Please set this load to \"group 
commit\"=false.";
+            st = Status::InternalError("Http load size too large.");
+        }
     }
 
     ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
 
     LOG(INFO) << "new income streaming load request." << ctx->brief()
               << " sql : " << req->header(HTTP_SQL);
-
-    auto st = _on_header(req, ctx);
+    if (st.ok()) {
+        st = _on_header(req, ctx);
+    }
     if (!st.ok()) {
         ctx->status = std::move(st);
         if (ctx->body_sink != nullptr) {
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index d93332b6dff..cac484a3cf7 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -194,11 +194,18 @@ int StreamLoadAction::on_header(HttpRequest* req) {
             st = Status::InternalError("label and group_commit can't be set at 
the same time");
         }
         ctx->group_commit = load_size_smaller_than_wal_limit(req);
-    } else {
-        if (ctx->label.empty()) {
-            ctx->label = generate_uuid_string();
+        if (!ctx->group_commit) {
+            LOG(WARNING) << "The data size for this stream load("
+                         << std::stol(req->header(HttpHeaders::CONTENT_LENGTH))
+                         << " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
+                         << config::wal_max_disk_size * 0.8
+                         << " Bytes). Please set this load to \"group 
commit\"=false.";
+            st = Status::InternalError("Stream load size too large.");
         }
     }
+    if (!ctx->group_commit && ctx->label.empty()) {
+        ctx->label = generate_uuid_string();
+    }
 
     ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
 
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index bfef46036b7..0d66887663f 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -191,7 +191,7 @@ void do_dir_response(const std::string& dir_path, 
HttpRequest* req) {
 }
 
 bool load_size_smaller_than_wal_limit(HttpRequest* req) {
-    // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload 
content length. If it is empty, it means this streamload
+    // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload 
content length. If it is empty or equels to 0, it means this streamload
     // is a chunked streamload and we are not sure its size.
     // 2. if streamload content length is too large, like larger than 80% of 
the WAL constrain.
     //
@@ -200,7 +200,7 @@ bool load_size_smaller_than_wal_limit(HttpRequest* req) {
     if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
         size_t body_bytes = 
std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
         // TODO(Yukang): change it to WalManager::wal_limit
-        return !(body_bytes > config::wal_max_disk_size * 0.8);
+        return (body_bytes <= config::wal_max_disk_size * 0.8) && (body_bytes 
!= 0);
     } else {
         return false;
     }
diff --git 
a/regression-test/data/load_p0/stream_load/test_group_commit_wal_limit.csv.gz 
b/regression-test/data/load_p0/stream_load/test_group_commit_wal_limit.csv.gz
new file mode 100644
index 00000000000..64a354ca764
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_group_commit_wal_limit.csv.gz 
differ
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy 
b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy
new file mode 100644
index 00000000000..68ccc3f8538
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_group_commit_wal_limit") {
+    def db= "regression_test_load_p0_stream_load"
+    def tableName = "test_group_commit_wal_limit"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+                k bigint,  
+                v string
+                )  
+                UNIQUE KEY(k)  
+                DISTRIBUTED BY HASH (k) BUCKETS 32  
+                PROPERTIES(  
+                "replication_num" = "1"
+            );
+    """
+    // streamload
+    // normal case
+    StringBuilder strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " )
+    strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " )
+    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/${db}/${tableName}/_stream_load")
+
+    String command = strBuilder.toString()
+    logger.info("command is " + command)
+    def process = ['bash','-c',command].execute() 
+    def code = process.waitFor()
+    assertEquals(code, 0)
+    def out = process.text
+    logger.info("out is " + out )
+    assertTrue(out.contains('group_commit'))
+
+    // chunked data case
+    strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " )
+    strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " )
+    strBuilder.append(" -H \"Content-Length:0\" " )
+    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/${db}/${tableName}/_stream_load")
+
+    command = strBuilder.toString()
+    logger.info("command is " + command)
+    process = ['bash','-c',command].execute() 
+    code = process.waitFor()
+    assertEquals(code, 0)
+    out = process.text
+    logger.info("out is " + out )
+    assertTrue(out.contains('[INTERNAL_ERROR]Stream load size too large'))
+
+    // too lagre data case 1TB
+    strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " )
+    strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " )
+    strBuilder.append(" -H \"Content-Length:1099511627776\" " )
+    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/${db}/${tableName}/_stream_load")
+
+    command = strBuilder.toString()
+    logger.info("command is " + command)
+    process = ['bash','-c',command].execute() 
+    code = process.waitFor()
+    assertEquals(code, 0)
+    out = process.text
+    logger.info("out is " + out )
+    assertTrue(out.contains('[INTERNAL_ERROR]Stream load size too large'))
+
+    // httpload 
+    // normal case
+    strBuilder = new StringBuilder()
+    strBuilder.append("curl -v --location-trusted -u " + 
context.config.jdbcUser + ":" + context.config.jdbcPassword)
+    String sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) 
select c1, c2 from http_stream(\\\"format\\\" = \\\"csv\\\", 
\\\"column_separator\\\" = \\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" "
+    strBuilder.append(sql)
+    strBuilder.append(" -H \"group_commit:true\"") 
+    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/_http_stream")
+
+    command = strBuilder.toString()
+    logger.info("command is " + command)
+    process = ['bash','-c',command].execute() 
+    code = process.waitFor()
+    assertEquals(code, 0)
+    out = process.text
+    logger.info("out is " + out )
+    assertTrue(out.contains('group_commit'))
+
+    // chunked data case
+    strBuilder = new StringBuilder()
+    strBuilder.append("curl -v --location-trusted -u " + 
context.config.jdbcUser + ":" + context.config.jdbcPassword)
+    sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, 
c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = 
\\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" "
+    strBuilder.append(sql)
+    strBuilder.append(" -H \"group_commit:true\" -H \"Content-Length:0\"") 
+    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/_http_stream")
+
+    command = strBuilder.toString()
+    logger.info("command is " + command)
+    process = ['bash','-c',command].execute() 
+    code = process.waitFor()
+    assertEquals(code, 0)
+    out = process.text
+    logger.info("out is " + out )
+    assertTrue(out.contains('[INTERNAL_ERROR]Http load size too large'))
+
+    // too lagre data case 1TB
+    strBuilder = new StringBuilder()
+    strBuilder.append("curl -v --location-trusted -u " + 
context.config.jdbcUser + ":" + context.config.jdbcPassword)
+    sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, 
c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = 
\\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" "
+    strBuilder.append(sql)
+    strBuilder.append(" -H \"group_commit:true\" -H 
\"Content-Length:1099511627776\"") 
+    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/_http_stream")
+
+    command = strBuilder.toString()
+    logger.info("command is " + command)
+    process = ['bash','-c',command].execute() 
+    code = process.waitFor()
+    assertEquals(code, 0)
+    out = process.text
+    logger.info("out is " + out )
+    assertTrue(out.contains('[INTERNAL_ERROR]Http load size too large'))
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to