This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 55d1090137 [feature](insert) Support group commit stream load (#24304)
55d1090137 is described below

commit 55d1090137179692e9b73be4407821e60e5f6891
Author: meiyi <[email protected]>
AuthorDate: Tue Sep 26 20:57:02 2023 +0800

    [feature](insert) Support group commit stream load (#24304)
---
 be/src/http/action/http_stream.cpp                 |  15 +
 be/src/http/action/stream_load.cpp                 |  38 ++-
 be/src/http/http_common.h                          |   1 +
 be/src/runtime/group_commit_mgr.cpp                | 128 +++++++-
 be/src/runtime/group_commit_mgr.h                  |   6 +
 be/src/runtime/stream_load/stream_load_context.cpp |  23 +-
 be/src/runtime/stream_load/stream_load_context.h   |   3 +
 be/src/vec/sink/group_commit_block_sink.cpp        | 105 +++++++
 be/src/vec/sink/group_commit_block_sink.h          |  59 ++++
 .../apache/doris/analysis/NativeInsertStmt.java    |  13 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  31 +-
 gensrc/thrift/FrontendService.thrift               |   3 +
 .../data/load_p0/http_stream/test_compress.csv.bz2 | Bin 0 -> 65 bytes
 .../data/load_p0/http_stream/test_compress.csv.gz  | Bin 0 -> 67 bytes
 .../data/load_p0/http_stream/test_compress.csv.lz4 | Bin 0 -> 48 bytes
 .../http_stream/test_group_commit_http_stream.out  |  10 +
 .../data/load_p0/http_stream/test_stream_load1.csv |   2 +
 .../data/load_p0/http_stream/test_stream_load2.csv |   2 +
 .../data/load_p0/http_stream/test_stream_load3.csv |   6 +
 .../data/load_p0/stream_load/test_compress.csv.bz2 | Bin 0 -> 65 bytes
 .../data/load_p0/stream_load/test_compress.csv.gz  | Bin 0 -> 67 bytes
 .../data/load_p0/stream_load/test_compress.csv.lz4 | Bin 0 -> 48 bytes
 .../stream_load/test_group_commit_stream_load.out  |  27 ++
 .../data/load_p0/stream_load/test_stream_load1.csv |   2 +
 .../data/load_p0/stream_load/test_stream_load2.csv |   2 +
 .../data/load_p0/stream_load/test_stream_load3.csv |   6 +
 .../regression/action/StreamLoadAction.groovy      |   4 +
 .../test_group_commit_http_stream.groovy           | 328 +++++++++++++++++++++
 .../test_group_commit_stream_load.groovy           | 310 +++++++++++++++++++
 29 files changed, 1093 insertions(+), 31 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index 11e69615d2..62153c8f8f 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -49,6 +49,7 @@
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
+#include "runtime/group_commit_mgr.h"
 #include "runtime/load_path_mgr.h"
 #include "runtime/plan_fragment_executor.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
@@ -139,6 +140,11 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, 
std::shared_ptr<StreamLo
     // wait stream load finish
     RETURN_IF_ERROR(ctx->future.get());
 
+    if (ctx->group_commit) {
+        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
+        return Status::OK();
+    }
+
     int64_t commit_and_publish_start_time = MonotonicNanos();
     RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
     ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
@@ -158,6 +164,7 @@ int HttpStreamAction::on_header(HttpRequest* req) {
     if (ctx->label.empty()) {
         ctx->label = generate_uuid_string();
     }
+    ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true");
 
     LOG(INFO) << "new income streaming load request." << ctx->brief()
               << " sql : " << req->header(HTTP_SQL);
@@ -284,6 +291,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
     request.__set_load_sql(http_req->header(HTTP_SQL));
     request.__set_loadId(ctx->id.to_thrift());
     request.__set_label(ctx->label);
+    request.__set_group_commit(ctx->group_commit);
     if (_exec_env->master_info()->__isset.backend_id) {
         request.__set_backend_id(_exec_env->master_info()->backend_id);
     } else {
@@ -308,6 +316,13 @@ Status HttpStreamAction::_process_put(HttpRequest* 
http_req,
     ctx->table = ctx->put_result.params.table_name;
     ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
     ctx->put_result.params.__set_wal_id(ctx->wal_id);
+
+    if (ctx->group_commit) {
+        ctx->db_id = ctx->put_result.db_id;
+        ctx->table_id = ctx->put_result.table_id;
+        ctx->schema_version = ctx->put_result.base_schema_version;
+        return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
+    }
     return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
 }
 
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index b4d53b74c4..be4d870323 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -52,6 +52,7 @@
 #include "olap/storage_engine.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/group_commit_mgr.h"
 #include "runtime/load_path_mgr.h"
 #include "runtime/message_body_sink.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
@@ -153,6 +154,11 @@ Status 
StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
     // wait stream load finish
     RETURN_IF_ERROR(ctx->future.get());
 
+    if (ctx->group_commit) {
+        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
+        return Status::OK();
+    }
+
     if (ctx->two_phase_commit) {
         int64_t pre_commit_start_time = MonotonicNanos();
         
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
@@ -178,8 +184,16 @@ int StreamLoadAction::on_header(HttpRequest* req) {
     url_decode(req->param(HTTP_DB_KEY), &ctx->db);
     url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
     ctx->label = req->header(HTTP_LABEL_KEY);
-    if (ctx->label.empty()) {
-        ctx->label = generate_uuid_string();
+    Status st = Status::OK();
+    if (iequal(req->header(HTTP_GROUP_COMMIT), "true")) {
+        if (!ctx->label.empty()) {
+            st = Status::InternalError("label and group_commit can't be set at 
the same time");
+        }
+        ctx->group_commit = true;
+    } else {
+        if (ctx->label.empty()) {
+            ctx->label = generate_uuid_string();
+        }
     }
 
     ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? 
true : false;
@@ -187,7 +201,9 @@ int StreamLoadAction::on_header(HttpRequest* req) {
     LOG(INFO) << "new income streaming load request." << ctx->brief() << ", 
db=" << ctx->db
               << ", tbl=" << ctx->table;
 
-    auto st = _on_header(req, ctx);
+    if (st.ok()) {
+        st = _on_header(req, ctx);
+    }
     if (!st.ok()) {
         ctx->status = std::move(st);
         if (ctx->need_rollback) {
@@ -287,9 +303,11 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
std::shared_ptr<Strea
         ctx->load_comment = http_req->header(HTTP_COMMENT);
     }
     // begin transaction
-    int64_t begin_txn_start_time = MonotonicNanos();
-    RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
-    ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
+    if (!ctx->group_commit) {
+        int64_t begin_txn_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
+        ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
+    }
 
     // process put file
     return _process_put(http_req, ctx);
@@ -555,6 +573,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
         bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), 
"true");
         request.__set_memtable_on_sink_node(value);
     }
+    request.__set_group_commit(ctx->group_commit);
 
 #ifndef BE_TEST
     // plan this load
@@ -582,6 +601,13 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         return Status::OK();
     }
 
+    if (ctx->group_commit) {
+        ctx->db_id = ctx->put_result.db_id;
+        ctx->table_id = ctx->put_result.table_id;
+        ctx->schema_version = ctx->put_result.base_schema_version;
+        return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
+    }
+
     return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
 }
 
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 8e84140707..5a1550f48f 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -65,5 +65,6 @@ static const std::string HTTP_TXN_OPERATION_KEY = 
"txn_operation";
 static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node";
 static const std::string HTTP_WAL_ID_KY = "wal_id";
 static const std::string HTTP_AUTH_CODE = "auth_code";
+static const std::string HTTP_GROUP_COMMIT = "group_commit";
 
 } // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 45ccb8d6b4..f039cf34e8 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -35,11 +35,11 @@
 #include "runtime/stream_load/stream_load_context.h"
 #include "util/thrift_rpc_helper.h"
 #include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/sink/group_commit_block_sink.h"
 
 namespace doris {
 
 class TPlan;
-class FragmentExecState;
 
 Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> 
block) {
     DCHECK(block->get_schema_version() == schema_version);
@@ -259,7 +259,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
                                                    bool prepare_failed, 
RuntimeState* state) {
     {
         std::lock_guard<doris::Mutex> l(_lock);
-        if (prepare_failed) {
+        if (prepare_failed || !status.ok()) {
             auto it = _load_block_queues.find(instance_id);
             if (it != _load_block_queues.end()) {
                 it->second->cancel(status);
@@ -511,6 +511,130 @@ Status 
GroupCommitMgr::_append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
     return Status::OK();
 }
 
+Status 
GroupCommitMgr::group_commit_stream_load(std::shared_ptr<StreamLoadContext> 
ctx) {
+    return _insert_into_thread_pool->submit_func([ctx, this] {
+        Status st = _group_commit_stream_load(ctx);
+        if (!st.ok()) {
+            ctx->promise.set_value(st);
+        }
+    });
+}
+
+Status 
GroupCommitMgr::_group_commit_stream_load(std::shared_ptr<StreamLoadContext> 
ctx) {
+    auto& fragment_params = ctx->put_result.params;
+    auto& tdesc_tbl = fragment_params.desc_tbl;
+    DCHECK(fragment_params.params.per_node_scan_ranges.size() == 1);
+    DCHECK(fragment_params.params.per_node_scan_ranges.begin()->second.size() 
== 1);
+    auto& tscan_range_params = 
fragment_params.params.per_node_scan_ranges.begin()->second.at(0);
+    auto& nodes = fragment_params.fragment.plan.nodes;
+    DCHECK(nodes.size() > 0);
+    auto& plan_node = nodes.at(0);
+
+    std::vector<std::shared_ptr<doris::vectorized::FutureBlock>> future_blocks;
+    {
+        std::shared_ptr<LoadBlockQueue> load_block_queue;
+        // 1. FileScanNode consumes data from the pipe.
+        std::unique_ptr<RuntimeState> runtime_state = 
RuntimeState::create_unique();
+        TUniqueId load_id;
+        load_id.hi = ctx->id.hi;
+        load_id.lo = ctx->id.lo;
+        TQueryOptions query_options;
+        query_options.query_type = TQueryType::LOAD;
+        TQueryGlobals query_globals;
+        runtime_state->init(load_id, query_options, query_globals, _exec_env);
+        
runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>(
+                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", 
ctx->id.to_string()), -1));
+        DescriptorTbl* desc_tbl = nullptr;
+        RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(), 
tdesc_tbl, &desc_tbl));
+        runtime_state->set_desc_tbl(desc_tbl);
+        auto file_scan_node =
+                vectorized::NewFileScanNode(runtime_state->obj_pool(), 
plan_node, *desc_tbl);
+        Status status = Status::OK();
+        auto sink = stream_load::GroupCommitBlockSink(
+                runtime_state->obj_pool(), file_scan_node.row_desc(),
+                fragment_params.fragment.output_exprs, &status);
+        std::unique_ptr<int, std::function<void(int*)>> 
close_scan_node_func((int*)0x01, [&](int*) {
+            if (load_block_queue != nullptr) {
+                load_block_queue->remove_load_id(load_id);
+            }
+            file_scan_node.close(runtime_state.get());
+            sink.close(runtime_state.get(), status);
+        });
+        RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get()));
+        RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
+        std::vector<TScanRangeParams> params_vector;
+        params_vector.emplace_back(tscan_range_params);
+        file_scan_node.set_scan_ranges(params_vector);
+        RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
+
+        RETURN_IF_ERROR(status);
+        RETURN_IF_ERROR(sink.init(fragment_params.fragment.output_sink));
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink.prepare(runtime_state.get()));
+        RETURN_IF_ERROR(sink.open(runtime_state.get()));
+
+        // 2. Put the block into block queue.
+        std::unique_ptr<doris::vectorized::Block> _block =
+                doris::vectorized::Block::create_unique();
+        bool first = true;
+        bool eof = false;
+        while (!eof) {
+            // TODO what to do if scan one block error
+            RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), 
_block.get(), &eof));
+            RETURN_IF_ERROR(sink.send(runtime_state.get(), _block.get()));
+            std::shared_ptr<doris::vectorized::FutureBlock> future_block =
+                    std::make_shared<doris::vectorized::FutureBlock>();
+            future_block->swap(*(_block.get()));
+            future_block->set_info(ctx->schema_version, load_id, first, eof);
+            // TODO what to do if add one block error
+            if (load_block_queue == nullptr) {
+                RETURN_IF_ERROR(_get_first_block_load_queue(ctx->db_id, 
ctx->table_id, future_block,
+                                                            load_block_queue));
+                ctx->label = load_block_queue->label;
+                ctx->txn_id = load_block_queue->txn_id;
+            }
+            RETURN_IF_ERROR(load_block_queue->add_block(future_block));
+            if (future_block->rows() > 0) {
+                future_blocks.emplace_back(future_block);
+            }
+            first = false;
+        }
+        ctx->number_unselected_rows = 
runtime_state->num_rows_load_unselected();
+        ctx->number_filtered_rows = runtime_state->num_rows_load_filtered();
+        ctx->error_url = runtime_state->get_error_log_file_path();
+        if (!runtime_state->get_error_log_file_path().empty()) {
+            LOG(INFO) << "id=" << print_id(load_id)
+                      << ", url=" << runtime_state->get_error_log_file_path()
+                      << ", load rows=" << runtime_state->num_rows_load_total()
+                      << ", filter rows=" << 
runtime_state->num_rows_load_filtered()
+                      << ", unselect rows=" << 
runtime_state->num_rows_load_unselected()
+                      << ", success rows=" << 
runtime_state->num_rows_load_success();
+        }
+    }
+
+    int64_t total_rows = 0;
+    int64_t loaded_rows = 0;
+    // 3. wait to wal
+    for (const auto& future_block : future_blocks) {
+        std::unique_lock<doris::Mutex> l(*(future_block->lock));
+        if (!future_block->is_handled()) {
+            future_block->cv->wait(l);
+        }
+        // future_block->get_status()
+        total_rows += future_block->get_total_rows();
+        loaded_rows += future_block->get_loaded_rows();
+    }
+    ctx->number_total_rows = total_rows + ctx->number_unselected_rows + 
ctx->number_filtered_rows;
+    ctx->number_loaded_rows = loaded_rows;
+    ctx->number_filtered_rows += total_rows - ctx->number_loaded_rows;
+    ctx->promise.set_value(Status::OK());
+    VLOG_DEBUG << "finish read all block of pipe=" << ctx->id.to_string()
+               << ", total rows=" << ctx->number_total_rows
+               << ", loaded rows=" << ctx->number_loaded_rows
+               << ", filtered rows=" << ctx->number_filtered_rows
+               << ", unselected rows=" << ctx->number_unselected_rows;
+    return Status::OK();
+}
+
 Status GroupCommitMgr::_get_first_block_load_queue(
         int64_t db_id, int64_t table_id, 
std::shared_ptr<vectorized::FutureBlock> block,
         std::shared_ptr<LoadBlockQueue>& load_block_queue) {
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 1d124009d1..01a0905c40 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -35,6 +35,7 @@ class TUniqueId;
 class TExecPlanFragmentParams;
 class ObjectPool;
 class RuntimeState;
+class StreamLoadContext;
 class StreamLoadPipe;
 
 class LoadBlockQueue {
@@ -118,6 +119,9 @@ public:
                                const PGroupCommitInsertRequest* request,
                                PGroupCommitInsertResponse* response);
 
+    // stream load
+    Status group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
+
     // used when init group_commit_scan_node
     Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
@@ -126,6 +130,8 @@ private:
     // used by insert into
     Status _append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
                        const PGroupCommitInsertRequest* request);
+    // used by stream load
+    Status _group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
     Status _get_first_block_load_queue(int64_t db_id, int64_t table_id,
                                        
std::shared_ptr<vectorized::FutureBlock> block,
                                        std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
diff --git a/be/src/runtime/stream_load/stream_load_context.cpp 
b/be/src/runtime/stream_load/stream_load_context.cpp
index f381ba097d..0cd1f0e3d5 100644
--- a/be/src/runtime/stream_load/stream_load_context.cpp
+++ b/be/src/runtime/stream_load/stream_load_context.cpp
@@ -50,9 +50,14 @@ std::string StreamLoadContext::to_json() const {
     writer.Key("Comment");
     writer.String(load_comment.c_str());
 
-    writer.Key("TwoPhaseCommit");
-    std::string need_two_phase_commit = two_phase_commit ? "true" : "false";
-    writer.String(need_two_phase_commit.c_str());
+    if (!group_commit) {
+        writer.Key("TwoPhaseCommit");
+        std::string need_two_phase_commit = two_phase_commit ? "true" : 
"false";
+        writer.String(need_two_phase_commit.c_str());
+    } else {
+        writer.Key("GroupCommit");
+        writer.Bool(true);
+    }
 
     // status
     writer.Key("Status");
@@ -92,16 +97,20 @@ std::string StreamLoadContext::to_json() const {
     writer.Int64(receive_bytes);
     writer.Key("LoadTimeMs");
     writer.Int64(load_cost_millis);
-    writer.Key("BeginTxnTimeMs");
-    writer.Int64(begin_txn_cost_nanos / 1000000);
+    if (!group_commit) {
+        writer.Key("BeginTxnTimeMs");
+        writer.Int64(begin_txn_cost_nanos / 1000000);
+    }
     writer.Key("StreamLoadPutTimeMs");
     writer.Int64(stream_load_put_cost_nanos / 1000000);
     writer.Key("ReadDataTimeMs");
     writer.Int64(read_data_cost_nanos / 1000000);
     writer.Key("WriteDataTimeMs");
     writer.Int(write_data_cost_nanos / 1000000);
-    writer.Key("CommitAndPublishTimeMs");
-    writer.Int64(commit_and_publish_txn_cost_nanos / 1000000);
+    if (!group_commit) {
+        writer.Key("CommitAndPublishTimeMs");
+        writer.Int64(commit_and_publish_txn_cost_nanos / 1000000);
+    }
 
     if (!error_url.empty()) {
         writer.Key("ErrorURL");
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 844caea126..ab8cc6be04 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -136,6 +136,8 @@ public:
     int64_t db_id = -1;
     int64_t wal_id = -1;
     std::string table;
+    int64_t table_id = -1;
+    int64_t schema_version = -1;
     std::string label;
     // optional
     std::string sub_label;
@@ -174,6 +176,7 @@ public:
     bool use_streaming = false;
     TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN;
     TFileCompressType::type compress_type = TFileCompressType::UNKNOWN;
+    bool group_commit = false;
 
     std::shared_ptr<MessageBodySink> body_sink;
     std::shared_ptr<io::StreamLoadPipe> pipe;
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
new file mode 100644
index 0000000000..08f6d87ade
--- /dev/null
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/group_commit_block_sink.h"
+
+#include "runtime/group_commit_mgr.h"
+#include "runtime/runtime_state.h"
+#include "util/doris_metrics.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/sink/vtablet_finder.h"
+#include "vec/sink/vtablet_sink.h"
+
+namespace doris {
+
+namespace stream_load {
+
+GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const 
RowDescriptor& row_desc,
+                                           const std::vector<TExpr>& texprs, 
Status* status)
+        : DataSink(row_desc) {
+    // From the thrift expressions create the real exprs.
+    *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
+    _name = "GroupCommitBlockSink";
+}
+
+GroupCommitBlockSink::~GroupCommitBlockSink() = default;
+
+Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
+    DCHECK(t_sink.__isset.olap_table_sink);
+    auto& table_sink = t_sink.olap_table_sink;
+    _tuple_desc_id = table_sink.tuple_id;
+    _schema.reset(new OlapTableSchemaParam());
+    RETURN_IF_ERROR(_schema->init(table_sink.schema));
+    return Status::OK();
+}
+
+Status GroupCommitBlockSink::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSink::prepare(state));
+    _state = state;
+
+    // profile must add to state's object pool
+    _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink"));
+    _mem_tracker =
+            std::make_shared<MemTracker>("OlapTableSink:" + 
std::to_string(state->load_job_id()));
+    SCOPED_TIMER(_profile->total_time_counter());
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+
+    // get table's tuple descriptor
+    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
+    if (_output_tuple_desc == nullptr) {
+        LOG(WARNING) << "unknown destination tuple descriptor, id=" << 
_tuple_desc_id;
+        return Status::InternalError("unknown destination tuple descriptor");
+    }
+
+    _block_convertor = 
std::make_unique<vectorized::OlapTableBlockConvertor>(_output_tuple_desc);
+    _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
+                                        _state->batch_size());
+    // Prepare the exprs to run.
+    return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
+}
+
+Status GroupCommitBlockSink::open(RuntimeState* state) {
+    // Prepare the exprs to run.
+    return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+}
+
+Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* 
input_block, bool eos) {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+    Status status = Status::OK();
+    auto rows = input_block->rows();
+    auto bytes = input_block->bytes();
+    if (UNLIKELY(rows == 0)) {
+        return status;
+    }
+    SCOPED_TIMER(_profile->total_time_counter());
+    // update incrementally so that FE can get the progress.
+    // the real 'num_rows_load_total' will be set when sink being closed.
+    state->update_num_rows_load_total(rows);
+    state->update_num_bytes_load_total(bytes);
+    DorisMetrics::instance()->load_rows->increment(rows);
+    DorisMetrics::instance()->load_bytes->increment(bytes);
+
+    std::shared_ptr<vectorized::Block> block;
+    bool has_filtered_rows = false;
+    RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
+            state, input_block, block, _output_vexpr_ctxs, rows, 
has_filtered_rows));
+    block->swap(*input_block);
+    return Status::OK();
+}
+
+} // namespace stream_load
+} // namespace doris
diff --git a/be/src/vec/sink/group_commit_block_sink.h 
b/be/src/vec/sink/group_commit_block_sink.h
new file mode 100644
index 0000000000..a309413f5a
--- /dev/null
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include "exec/data_sink.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/vtablet_sink.h"
+
+namespace doris {
+
+class OlapTableSchemaParam;
+class MemTracker;
+
+namespace stream_load {
+
+class GroupCommitBlockSink : public DataSink {
+public:
+    GroupCommitBlockSink(ObjectPool* pool, const RowDescriptor& row_desc,
+                         const std::vector<TExpr>& texprs, Status* status);
+
+    ~GroupCommitBlockSink() override;
+
+    Status init(const TDataSink& sink) override;
+
+    Status prepare(RuntimeState* state) override;
+
+    Status open(RuntimeState* state) override;
+
+    Status send(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override;
+
+private:
+    vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+
+    int _tuple_desc_id = -1;
+    std::shared_ptr<OlapTableSchemaParam> _schema;
+
+    RuntimeState* _state = nullptr;
+    std::shared_ptr<MemTracker> _mem_tracker;
+    // this is tuple descriptor of destination OLAP table
+    TupleDescriptor* _output_tuple_desc = nullptr;
+    std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor;
+};
+
+} // namespace stream_load
+} // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index ad1a2f1e52..1fd75c304c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -164,7 +164,7 @@ public class NativeInsertStmt extends InsertStmt {
     private ByteString rangeBytes = null;
     private long tableId = -1;
     // true if be generates an insert from group commit tvf stmt and executes 
to load data
-    public boolean isInnerGroupCommit = false;
+    public boolean isGroupCommitTvf = false;
 
     private boolean isFromDeleteOrUpdateStmt = false;
 
@@ -895,7 +895,7 @@ public class NativeInsertStmt extends InsertStmt {
         }
         if (targetTable instanceof OlapTable) {
             checkInnerGroupCommit();
-            OlapTableSink sink = isInnerGroupCommit ? new 
GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple,
+            OlapTableSink sink = isGroupCommitTvf ? new 
GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple,
                     targetPartitionIds, 
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert())
                     : new OlapTableSink((OlapTable) targetTable, olapTuple, 
targetPartitionIds,
                             
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
@@ -933,10 +933,11 @@ public class NativeInsertStmt extends InsertStmt {
     private void checkInnerGroupCommit() {
         List<TableRef> tableRefs = new ArrayList<>();
         queryStmt.collectTableRefs(tableRefs);
-        if (tableRefs.size() == 1 && tableRefs.get(0) instanceof 
TableValuedFunctionRef
-                && ((TableValuedFunctionRef) tableRefs.get(
-                0)).getTableFunction() instanceof 
GroupCommitTableValuedFunction) {
-            isInnerGroupCommit = true;
+        if (tableRefs.size() == 1 && tableRefs.get(0) instanceof 
TableValuedFunctionRef) {
+            TableValuedFunctionRef tvfRef = (TableValuedFunctionRef) 
tableRefs.get(0);
+            if (tvfRef.getTableFunction() instanceof 
GroupCommitTableValuedFunction) {
+                isGroupCommitTvf = true;
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 304cbe6ab5..08f624c99c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1902,7 +1902,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 if (Config.enable_pipeline_load) {
                     
result.setPipelineParams(pipelineStreamLoadPutImpl(request));
                 } else {
-                    result.setParams(streamLoadPutImpl(request));
+                    result.setParams(streamLoadPutImpl(request, result));
                 }
             }
         } catch (UserException e) {
@@ -2060,6 +2060,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             NativeInsertStmt parsedStmt = (NativeInsertStmt) 
SqlParserUtils.getFirstStmt(parser);
             parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0));
             parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
+            if (request.isGroupCommit() && parsedStmt.getLabel() != null) {
+                throw new AnalysisException("label and group_commit can't be 
set at the same time");
+            }
             StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
             ctx.setExecutor(executor);
             TQueryOptions tQueryOptions = ctx.getSessionVariable().toThrift();
@@ -2077,10 +2080,12 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.getParams().setTableName(parsedStmt.getTbl());
             // The txn_id here is obtained from the NativeInsertStmt
             result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id));
-            if (parsedStmt.isInnerGroupCommit) {
-                result.setBaseSchemaVersion(((OlapTable) 
parsedStmt.getTargetTable()).getBaseSchemaVersion());
+            if (parsedStmt.isGroupCommitTvf) {
                 result.getParams().params.setGroupCommit(true);
             }
+            result.setDbId(parsedStmt.getTargetTable().getDatabase().getId());
+            result.setTableId(parsedStmt.getTargetTable().getId());
+            result.setBaseSchemaVersion(((OlapTable) 
parsedStmt.getTargetTable()).getBaseSchemaVersion());
         } catch (UserException e) {
             LOG.warn("exec sql error", e);
             throw new UserException("exec sql error" + e);
@@ -2090,7 +2095,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
     }
 
-    private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest 
request) throws UserException {
+    private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest 
request, TStreamLoadPutResult result)
+            throws UserException {
         String cluster = request.getCluster();
         if (Strings.isNullOrEmpty(cluster)) {
             cluster = SystemInfoService.DEFAULT_CLUSTER;
@@ -2108,6 +2114,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? 
request.getThriftRpcTimeoutMs() : 5000;
         Table table = db.getTableOrMetaException(request.getTbl(), 
TableType.OLAP);
+        result.setDbId(db.getId());
+        result.setTableId(table.getId());
+        result.setBaseSchemaVersion(((OlapTable) 
table).getBaseSchemaVersion());
         return generatePlanFragmentParams(request, db, fullDbName, (OlapTable) 
table, timeoutMs);
     }
 
@@ -2133,13 +2142,15 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             }
             StreamLoadPlanner planner = new StreamLoadPlanner(db, table, 
streamLoadTask);
             TExecPlanFragmentParams plan = 
planner.plan(streamLoadTask.getId(), multiTableFragmentInstanceIdIndex);
-            // add table indexes to transaction state
-            TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
-                    .getTransactionState(db.getId(), request.getTxnId());
-            if (txnState == null) {
-                throw new UserException("txn does not exist: " + 
request.getTxnId());
+            if (!request.isGroupCommit()) {
+                // add table indexes to transaction state
+                TransactionState txnState = 
Env.getCurrentGlobalTransactionMgr()
+                        .getTransactionState(db.getId(), request.getTxnId());
+                if (txnState == null) {
+                    throw new UserException("txn does not exist: " + 
request.getTxnId());
+                }
+                txnState.addTableIndexes(table);
             }
-            txnState.addTableIndexes(table);
             plan.setTableName(table.getName());
             
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
             return plan;
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index b1ccf7db08..901acca70d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -639,6 +639,7 @@ struct TStreamLoadPutRequest {
     // only valid when file type is CSV
     52: optional i8 escape
     53: optional bool memtable_on_sink_node;
+    54: optional bool group_commit
 }
 
 struct TStreamLoadPutResult {
@@ -648,6 +649,8 @@ struct TStreamLoadPutResult {
     3: optional PaloInternalService.TPipelineFragmentParams pipeline_params
     // used for group commit
     4: optional i64 base_schema_version
+    5: optional i64 db_id
+    6: optional i64 table_id
 }
 
 struct TStreamLoadMultiTablePutResult {
diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.bz2 
b/regression-test/data/load_p0/http_stream/test_compress.csv.bz2
new file mode 100644
index 0000000000..b2fd1fcfbe
Binary files /dev/null and 
b/regression-test/data/load_p0/http_stream/test_compress.csv.bz2 differ
diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.gz 
b/regression-test/data/load_p0/http_stream/test_compress.csv.gz
new file mode 100644
index 0000000000..a330a2b071
Binary files /dev/null and 
b/regression-test/data/load_p0/http_stream/test_compress.csv.gz differ
diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.lz4 
b/regression-test/data/load_p0/http_stream/test_compress.csv.lz4
new file mode 100644
index 0000000000..76955306d8
Binary files /dev/null and 
b/regression-test/data/load_p0/http_stream/test_compress.csv.lz4 differ
diff --git 
a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out 
b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
new file mode 100644
index 0000000000..b45fc6f714
--- /dev/null
+++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+5      e       -1
+5      e       50
+6      f       -1
+6      f       -1
+6      f       60
+7      e       70
+8      f       80
+
diff --git a/regression-test/data/load_p0/http_stream/test_stream_load1.csv 
b/regression-test/data/load_p0/http_stream/test_stream_load1.csv
new file mode 100644
index 0000000000..b86bcfa176
--- /dev/null
+++ b/regression-test/data/load_p0/http_stream/test_stream_load1.csv
@@ -0,0 +1,2 @@
+5,e
+6,f
diff --git a/regression-test/data/load_p0/http_stream/test_stream_load2.csv 
b/regression-test/data/load_p0/http_stream/test_stream_load2.csv
new file mode 100644
index 0000000000..93157e3d6d
--- /dev/null
+++ b/regression-test/data/load_p0/http_stream/test_stream_load2.csv
@@ -0,0 +1,2 @@
+70|7|e
+80|8|f
diff --git a/regression-test/data/load_p0/http_stream/test_stream_load3.csv 
b/regression-test/data/load_p0/http_stream/test_stream_load3.csv
new file mode 100644
index 0000000000..f257be1566
--- /dev/null
+++ b/regression-test/data/load_p0/http_stream/test_stream_load3.csv
@@ -0,0 +1,6 @@
+10,a,10
+11,a,11
+1a,a,11
+12,a
+20,b,21
+101,a,101
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.bz2 
b/regression-test/data/load_p0/stream_load/test_compress.csv.bz2
new file mode 100644
index 0000000000..b2fd1fcfbe
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_compress.csv.bz2 differ
diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.gz 
b/regression-test/data/load_p0/stream_load/test_compress.csv.gz
new file mode 100644
index 0000000000..a330a2b071
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_compress.csv.gz differ
diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.lz4 
b/regression-test/data/load_p0/stream_load/test_compress.csv.lz4
new file mode 100644
index 0000000000..76955306d8
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_compress.csv.lz4 differ
diff --git 
a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out 
b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
new file mode 100644
index 0000000000..246be06453
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
@@ -0,0 +1,27 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      a       10
+1      a       10
+1      a       10
+2      b       20
+2      b       20
+2      b       20
+3      c       30
+3      c       30
+3      c       30
+4      d       \N
+4      d       \N
+4      d       \N
+5      e       -1
+5      e       50
+6      f       -1
+6      f       -1
+6      f       60
+7      e       70
+8      f       80
+10     a       10
+11     a       11
+
+-- !sql --
+2402288
+
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load1.csv 
b/regression-test/data/load_p0/stream_load/test_stream_load1.csv
new file mode 100644
index 0000000000..b86bcfa176
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load1.csv
@@ -0,0 +1,2 @@
+5,e
+6,f
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load2.csv 
b/regression-test/data/load_p0/stream_load/test_stream_load2.csv
new file mode 100644
index 0000000000..93157e3d6d
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load2.csv
@@ -0,0 +1,2 @@
+70|7|e
+80|8|f
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load3.csv 
b/regression-test/data/load_p0/stream_load/test_stream_load3.csv
new file mode 100644
index 0000000000..f257be1566
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load3.csv
@@ -0,0 +1,6 @@
+10,a,10
+11,a,11
+1a,a,11
+12,a
+20,b,21
+101,a,101
\ No newline at end of file
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index 2dc4a0c8fa..0fe263f291 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -139,6 +139,10 @@ class StreamLoadAction implements SuiteAction {
         headers.put(key, value)
     }
 
+    void unset(String key) {
+        headers.remove(key)
+    }
+
     @Override
     void run() {
         String responseText = null
diff --git 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
new file mode 100644
index 0000000000..a27963742d
--- /dev/null
+++ 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -0,0 +1,328 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_http_stream") {
+    def db = "regression_test_load_p0_http_stream"
+    def tableName = "test_group_commit_http_stream"
+
+    def getRowCount = { expectedRowCount ->
+        def retry = 0
+        while (retry < 10) {
+            sleep(2000)
+            def rowCount = sql "select count(*) from ${tableName}"
+            logger.info("rowCount: " + rowCount + ", retry: " + retry)
+            if (rowCount[0][0] >= expectedRowCount) {
+                break
+            }
+            retry++
+        }
+    }
+
+    def getAlterTableState = {
+        def retry = 0
+        while (true) {
+            sleep(8000)
+            def state = sql "show alter table column where tablename = 
'${tableName}' order by CreateTime desc "
+            logger.info("alter table retry: ${retry},  state: ${state}")
+            if (state.size() > 0 && state[0][9] == "FINISHED") {
+                return true
+            }
+            retry++
+            if (retry >= 40) {
+                return false
+            }
+        }
+        return false
+    }
+
+    try {
+        // create table
+        sql """ drop table if exists ${tableName}; """
+
+        sql """
+        CREATE TABLE `${tableName}` (
+            `id` int(11) NOT NULL,
+            `name` varchar(1100) NULL,
+            `score` int(11) NULL default "-1"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`, `name`)
+        PARTITION BY RANGE (id) (
+            PARTITION plessThan1 VALUES LESS THAN ("0"),
+            PARTITION plessThan2 VALUES LESS THAN ("100")
+        )
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+        """
+
+        // stream load with compress file
+        String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ 
"lz4"} //, "deflate"}
+        /*for (final def compressionType in compressionTypes) {
+            def fileName = "test_compress.csv." + compressionType
+            streamLoad {
+                set 'version', '1'
+                set 'sql', """
+                    insert into ${db}.${tableName} select * from http_stream
+                    ("format"="csv", "compress_type"="${compressionType}", 
"column_separator"=",")
+                """
+                set 'group_commit', 'true'
+                file "${fileName}"
+                unset 'label'
+
+                time 10000 // limit inflight 10s
+            }
+        }*/
+
+        // stream load with 2 columns
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${tableName}(id, name) select c1, c2 
from http_stream
+                    ("format"="csv", "column_separator"=",")
+            """
+
+            set 'group_commit', 'true'
+            file "test_stream_load1.csv"
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+        }
+
+        // stream load with different column order
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${tableName}(score, id, name) select c1, 
c2, c3 from http_stream
+                    ("format"="csv", "column_separator"="|")
+            """
+
+            set 'group_commit', 'true'
+            file "test_stream_load2.csv"
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+        }
+
+        // stream load with where condition
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${tableName}(id, name) select c1, c2 
from http_stream 
+                    ("format"="csv", "column_separator"=",") where c1 > 5
+            """
+
+            set 'group_commit', 'true'
+            file "test_stream_load1.csv"
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertTrue(json.GroupCommit)
+                // assertEquals(2, json.NumberTotalRows)
+                assertEquals(1, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                // assertEquals(1, json.NumberUnselectedRows)
+            }
+        }
+
+        // stream load with mapping
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${tableName} select c1, c2, c1 * 10 from 
http_stream
+                    ("format"="csv", "column_separator"=",")
+            """
+
+            set 'group_commit', 'true'
+            file "test_stream_load1.csv"
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+        }
+
+        // stream load with filtered rows
+        /*streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${tableName} select c1, c2, c3 from 
http_stream where c2 = 'a'
+                    ("format"="csv", "column_separator"=",")
+            """
+
+            set 'group_commit', 'true'
+            file "test_stream_load3.csv"
+            set 'max_filter_ratio', '0.7'
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertTrue(json.GroupCommit)
+                assertEquals(6, json.NumberTotalRows)
+                assertEquals(2, json.NumberLoadedRows)
+                assertEquals(3, json.NumberFilteredRows)
+                assertEquals(1, json.NumberUnselectedRows)
+                assertFalse(json.ErrorURL.isEmpty())
+            }
+        }*/
+
+        // stream load with label
+        streamLoad {
+            set 'version', '1'
+            def label = 'l_' + System.currentTimeMillis()
+            set 'sql', """
+                    insert into ${db}.${tableName} with label ${label} select 
* from http_stream
+                    ("format"="csv", "column_separator"="|")
+            """
+
+            set 'group_commit', 'true'
+            file "test_stream_load2.csv"
+
+            time 10000 // limit inflight 10s
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("fail", json.Status.toLowerCase())
+            }
+        }
+
+        getRowCount(7)
+        qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
+    } finally {
+        // try_sql("DROP TABLE ${tableName}")
+    }
+
+    // stream load with large data and schema change
+    /*tableName = "test_stream_load_lineorder"
+    try {
+        sql """ DROP TABLE IF EXISTS `${tableName}` """
+        sql """
+            CREATE TABLE IF NOT EXISTS `${tableName}` (
+            `lo_orderkey` bigint(20) NOT NULL COMMENT "",
+            `lo_linenumber` bigint(20) NOT NULL COMMENT "",
+            `lo_custkey` int(11) NOT NULL COMMENT "",
+            `lo_partkey` int(11) NOT NULL COMMENT "",
+            `lo_suppkey` int(11) NOT NULL COMMENT "",
+            `lo_orderdate` int(11) NOT NULL COMMENT "",
+            `lo_orderpriority` varchar(16) NOT NULL COMMENT "",
+            `lo_shippriority` int(11) NOT NULL COMMENT "",
+            `lo_quantity` bigint(20) NOT NULL COMMENT "",
+            `lo_extendedprice` bigint(20) NOT NULL COMMENT "",
+            `lo_ordtotalprice` bigint(20) NOT NULL COMMENT "",
+            `lo_discount` bigint(20) NOT NULL COMMENT "",
+            `lo_revenue` bigint(20) NOT NULL COMMENT "",
+            `lo_supplycost` bigint(20) NOT NULL COMMENT "",
+            `lo_tax` bigint(20) NOT NULL COMMENT "",
+            `lo_commitdate` bigint(20) NOT NULL COMMENT "",
+            `lo_shipmode` varchar(11) NOT NULL COMMENT ""
+            )
+            PARTITION BY RANGE(`lo_orderdate`)
+            (PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
+            PARTITION p1993 VALUES [("19930101"), ("19940101")),
+            PARTITION p1994 VALUES [("19940101"), ("19950101")),
+            PARTITION p1995 VALUES [("19950101"), ("19960101")),
+            PARTITION p1996 VALUES [("19960101"), ("19970101")),
+            PARTITION p1997 VALUES [("19970101"), ("19980101")),
+            PARTITION p1998 VALUES [("19980101"), ("19990101")))
+            DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 4
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+        """
+        // load data
+        def columns = 
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
 
+            
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, 
+            lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode"""
+
+        new Thread(() -> {
+            Thread.sleep(3000)
+            // do light weight schema change
+            sql """ alter table ${tableName} ADD column sc_tmp varchar(100) 
after lo_revenue; """
+
+            assertTrue(getAlterTableState())
+
+            // do hard weight schema change
+            def new_columns = 
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
 
+            
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, 
+            lo_revenue,lo_supplycost,lo_tax,lo_shipmode,lo_commitdate"""
+            sql """ alter table ${tableName} order by (${new_columns}); """
+        }).start();
+
+        for (int i = 0; i < 4; i++) {
+
+            streamLoad {
+                set 'version', '1'
+                sql """
+                    insert into ${db}.${table} ($columns)
+                    select 
c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17 from http_stream
+                    ("format"="csv", "compress_type"="GZ", 
"column_separator"="|")
+                """
+                table tableName
+
+                // set 'column_separator', '|'
+                // set 'compress_type', 'GZ'
+                set 'columns', columns + ",lo_dummy"
+                set 'group_commit', 'true'
+                unset 'label'
+
+                file """${getS3Url()}/regression/ssb/sf0.1/lineorder.tbl.gz"""
+
+                time 10000 // limit inflight 10s
+
+                // stream load action will check result, include Success 
status, and NumberTotalRows == NumberLoadedRows
+
+                // if declared a check callback, the default check condition 
will ignore.
+                // So you must check all condition
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load ${i}, result: ${result}")
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                    assertEquals(json.NumberLoadedRows, 600572)
+                    assertTrue(json.LoadBytes > 0)
+                    assertTrue(json.GroupCommit)
+                }
+            }
+        }
+
+        getRowCount(2402288)
+        qt_sql """ select count(*) from ${tableName} """
+
+        assertTrue(getAlterTableState())
+    } finally {
+        // try_sql("DROP TABLE ${tableName}")
+    }*/
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
new file mode 100644
index 0000000000..9e50aebf64
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_stream_load") {
+    def tableName = "test_group_commit_stream_load"
+
+    def getRowCount = { expectedRowCount ->
+        def retry = 0
+        while (retry < 10) {
+            sleep(2000)
+            def rowCount = sql "select count(*) from ${tableName}"
+            logger.info("rowCount: " + rowCount + ", retry: " + retry)
+            if (rowCount[0][0] >= expectedRowCount) {
+                break
+            }
+            retry++
+        }
+    }
+
+    def getAlterTableState = {
+        def retry = 0
+        while (true) {
+            sleep(8000)
+            def state = sql "show alter table column where tablename = 
'${tableName}' order by CreateTime desc "
+            logger.info("alter table retry: ${retry},  state: ${state}")
+            if (state.size() > 0 && state[0][9] == "FINISHED") {
+                return true
+            }
+            retry++
+            if (retry >= 40) {
+                return false
+            }
+        }
+        return false
+    }
+
+    try {
+        // create table
+        sql """ drop table if exists ${tableName}; """
+
+        sql """
+        CREATE TABLE `${tableName}` (
+            `id` int(11) NOT NULL,
+            `name` varchar(1100) NULL,
+            `score` int(11) NULL default "-1"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`, `name`)
+        PARTITION BY RANGE (id) (
+            PARTITION plessThan1 VALUES LESS THAN ("0"),
+            PARTITION plessThan2 VALUES LESS THAN ("100")
+        )
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+        """
+
+        // stream load with compress file
+        String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ 
"lz4"} //, "deflate"}
+        for (final def compressionType in compressionTypes) {
+            def fileName = "test_compress.csv." + compressionType
+            streamLoad {
+                table "${tableName}"
+
+                set 'column_separator', ','
+                set 'group_commit', 'true'
+                set 'compress_type', "${compressionType}"
+                // set 'columns', 'id, name, score'
+                file "${fileName}"
+                unset 'label'
+
+                time 10000 // limit inflight 10s
+            }
+        }
+
+        // stream load with 2 columns
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'group_commit', 'true'
+            set 'columns', 'id, name'
+            file "test_stream_load1.csv"
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+        }
+
+        // stream load with different column order
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', '|'
+            set 'group_commit', 'true'
+            set 'columns', 'score, id, name'
+            file "test_stream_load2.csv"
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+        }
+
+        // stream load with where condition
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'group_commit', 'true'
+            set 'columns', 'id, name'
+            file "test_stream_load1.csv"
+            set 'where', 'id > 5'
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertTrue(json.GroupCommit)
+                assertEquals(2, json.NumberTotalRows)
+                assertEquals(1, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(1, json.NumberUnselectedRows)
+            }
+        }
+
+        // stream load with mapping
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'group_commit', 'true'
+            set 'columns', 'id, name, score = id * 10'
+            file "test_stream_load1.csv"
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+        }
+
+        // stream load with filtered rows
+        streamLoad {
+            table "${tableName}"
+
+            set 'column_separator', ','
+            set 'group_commit', 'true'
+            file "test_stream_load3.csv"
+            set 'where', "name = 'a'"
+            set 'max_filter_ratio', '0.7'
+            unset 'label'
+
+            time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertTrue(json.GroupCommit)
+                assertEquals(6, json.NumberTotalRows)
+                assertEquals(2, json.NumberLoadedRows)
+                assertEquals(3, json.NumberFilteredRows)
+                assertEquals(1, json.NumberUnselectedRows)
+                assertFalse(json.ErrorURL.isEmpty())
+            }
+        }
+
+        // stream load with label
+        streamLoad {
+            table "${tableName}"
+
+            // set 'label', 'test_stream_load'
+            set 'column_separator', '|'
+            set 'group_commit', 'true'
+            // set 'label', 'l_' + System.currentTimeMillis()
+            file "test_stream_load2.csv"
+
+            time 10000 // limit inflight 10s
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("fail", json.Status.toLowerCase())
+            }
+        }
+
+        getRowCount(21)
+        qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
+    } finally {
+        // try_sql("DROP TABLE ${tableName}")
+    }
+
+    // stream load with large data and schema change
+    tableName = "test_stream_load_lineorder"
+    try {
+        sql """ DROP TABLE IF EXISTS `${tableName}` """
+        sql """
+            CREATE TABLE IF NOT EXISTS `${tableName}` (
+            `lo_orderkey` bigint(20) NOT NULL COMMENT "",
+            `lo_linenumber` bigint(20) NOT NULL COMMENT "",
+            `lo_custkey` int(11) NOT NULL COMMENT "",
+            `lo_partkey` int(11) NOT NULL COMMENT "",
+            `lo_suppkey` int(11) NOT NULL COMMENT "",
+            `lo_orderdate` int(11) NOT NULL COMMENT "",
+            `lo_orderpriority` varchar(16) NOT NULL COMMENT "",
+            `lo_shippriority` int(11) NOT NULL COMMENT "",
+            `lo_quantity` bigint(20) NOT NULL COMMENT "",
+            `lo_extendedprice` bigint(20) NOT NULL COMMENT "",
+            `lo_ordtotalprice` bigint(20) NOT NULL COMMENT "",
+            `lo_discount` bigint(20) NOT NULL COMMENT "",
+            `lo_revenue` bigint(20) NOT NULL COMMENT "",
+            `lo_supplycost` bigint(20) NOT NULL COMMENT "",
+            `lo_tax` bigint(20) NOT NULL COMMENT "",
+            `lo_commitdate` bigint(20) NOT NULL COMMENT "",
+            `lo_shipmode` varchar(11) NOT NULL COMMENT ""
+            )
+            PARTITION BY RANGE(`lo_orderdate`)
+            (PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
+            PARTITION p1993 VALUES [("19930101"), ("19940101")),
+            PARTITION p1994 VALUES [("19940101"), ("19950101")),
+            PARTITION p1995 VALUES [("19950101"), ("19960101")),
+            PARTITION p1996 VALUES [("19960101"), ("19970101")),
+            PARTITION p1997 VALUES [("19970101"), ("19980101")),
+            PARTITION p1998 VALUES [("19980101"), ("19990101")))
+            DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 4
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+        """
+        // load data
+        def columns = 
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
 
+            
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, 
+            lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode"""
+
+        new Thread(() -> {
+            Thread.sleep(3000)
+            // do light weight schema change
+            sql """ alter table ${tableName} ADD column sc_tmp varchar(100) 
after lo_revenue; """
+
+            assertTrue(getAlterTableState())
+
+            // do hard weight schema change
+            def new_columns = 
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
 
+            
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, 
+            lo_revenue,lo_supplycost,lo_tax,lo_shipmode,lo_commitdate"""
+            sql """ alter table ${tableName} order by (${new_columns}); """
+        }).start();
+
+        for (int i = 0; i < 4; i++) {
+
+            streamLoad {
+                table tableName
+
+                set 'column_separator', '|'
+                set 'compress_type', 'GZ'
+                set 'columns', columns + ",lo_dummy"
+                set 'group_commit', 'true'
+                unset 'label'
+
+                file """${getS3Url()}/regression/ssb/sf0.1/lineorder.tbl.gz"""
+
+                time 10000 // limit inflight 10s
+
+                // stream load action will check result, include Success 
status, and NumberTotalRows == NumberLoadedRows
+
+                // if declared a check callback, the default check condition 
will ignore.
+                // So you must check all condition
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load ${i}, result: ${result}")
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                    assertEquals(json.NumberLoadedRows, 600572)
+                    assertTrue(json.LoadBytes > 0)
+                    assertTrue(json.GroupCommit)
+                }
+            }
+        }
+
+        getRowCount(2402288)
+        qt_sql """ select count(*) from ${tableName} """
+
+        assertTrue(getAlterTableState())
+    } finally {
+        // try_sql("DROP TABLE ${tableName}")
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to