This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 55d1090137 [feature](insert) Support group commit stream load (#24304)
55d1090137 is described below
commit 55d1090137179692e9b73be4407821e60e5f6891
Author: meiyi <[email protected]>
AuthorDate: Tue Sep 26 20:57:02 2023 +0800
[feature](insert) Support group commit stream load (#24304)
---
be/src/http/action/http_stream.cpp | 15 +
be/src/http/action/stream_load.cpp | 38 ++-
be/src/http/http_common.h | 1 +
be/src/runtime/group_commit_mgr.cpp | 128 +++++++-
be/src/runtime/group_commit_mgr.h | 6 +
be/src/runtime/stream_load/stream_load_context.cpp | 23 +-
be/src/runtime/stream_load/stream_load_context.h | 3 +
be/src/vec/sink/group_commit_block_sink.cpp | 105 +++++++
be/src/vec/sink/group_commit_block_sink.h | 59 ++++
.../apache/doris/analysis/NativeInsertStmt.java | 13 +-
.../apache/doris/service/FrontendServiceImpl.java | 31 +-
gensrc/thrift/FrontendService.thrift | 3 +
.../data/load_p0/http_stream/test_compress.csv.bz2 | Bin 0 -> 65 bytes
.../data/load_p0/http_stream/test_compress.csv.gz | Bin 0 -> 67 bytes
.../data/load_p0/http_stream/test_compress.csv.lz4 | Bin 0 -> 48 bytes
.../http_stream/test_group_commit_http_stream.out | 10 +
.../data/load_p0/http_stream/test_stream_load1.csv | 2 +
.../data/load_p0/http_stream/test_stream_load2.csv | 2 +
.../data/load_p0/http_stream/test_stream_load3.csv | 6 +
.../data/load_p0/stream_load/test_compress.csv.bz2 | Bin 0 -> 65 bytes
.../data/load_p0/stream_load/test_compress.csv.gz | Bin 0 -> 67 bytes
.../data/load_p0/stream_load/test_compress.csv.lz4 | Bin 0 -> 48 bytes
.../stream_load/test_group_commit_stream_load.out | 27 ++
.../data/load_p0/stream_load/test_stream_load1.csv | 2 +
.../data/load_p0/stream_load/test_stream_load2.csv | 2 +
.../data/load_p0/stream_load/test_stream_load3.csv | 6 +
.../regression/action/StreamLoadAction.groovy | 4 +
.../test_group_commit_http_stream.groovy | 328 +++++++++++++++++++++
.../test_group_commit_stream_load.groovy | 310 +++++++++++++++++++
29 files changed, 1093 insertions(+), 31 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index 11e69615d2..62153c8f8f 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -49,6 +49,7 @@
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
+#include "runtime/group_commit_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
@@ -139,6 +140,11 @@ Status HttpStreamAction::_handle(HttpRequest* http_req,
std::shared_ptr<StreamLo
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
+ if (ctx->group_commit) {
+ LOG(INFO) << "skip commit because this is group commit, pipe_id=" <<
ctx->id.to_string();
+ return Status::OK();
+ }
+
int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() -
commit_and_publish_start_time;
@@ -158,6 +164,7 @@ int HttpStreamAction::on_header(HttpRequest* req) {
if (ctx->label.empty()) {
ctx->label = generate_uuid_string();
}
+ ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true");
LOG(INFO) << "new income streaming load request." << ctx->brief()
<< " sql : " << req->header(HTTP_SQL);
@@ -284,6 +291,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
request.__set_load_sql(http_req->header(HTTP_SQL));
request.__set_loadId(ctx->id.to_thrift());
request.__set_label(ctx->label);
+ request.__set_group_commit(ctx->group_commit);
if (_exec_env->master_info()->__isset.backend_id) {
request.__set_backend_id(_exec_env->master_info()->backend_id);
} else {
@@ -308,6 +316,13 @@ Status HttpStreamAction::_process_put(HttpRequest*
http_req,
ctx->table = ctx->put_result.params.table_name;
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
+
+ if (ctx->group_commit) {
+ ctx->db_id = ctx->put_result.db_id;
+ ctx->table_id = ctx->put_result.table_id;
+ ctx->schema_version = ctx->put_result.base_schema_version;
+ return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
+ }
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index b4d53b74c4..be4d870323 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -52,6 +52,7 @@
#include "olap/storage_engine.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/group_commit_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/message_body_sink.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
@@ -153,6 +154,11 @@ Status
StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
+ if (ctx->group_commit) {
+ LOG(INFO) << "skip commit because this is group commit, pipe_id=" <<
ctx->id.to_string();
+ return Status::OK();
+ }
+
if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
@@ -178,8 +184,16 @@ int StreamLoadAction::on_header(HttpRequest* req) {
url_decode(req->param(HTTP_DB_KEY), &ctx->db);
url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
ctx->label = req->header(HTTP_LABEL_KEY);
- if (ctx->label.empty()) {
- ctx->label = generate_uuid_string();
+ Status st = Status::OK();
+ if (iequal(req->header(HTTP_GROUP_COMMIT), "true")) {
+ if (!ctx->label.empty()) {
+ st = Status::InternalError("label and group_commit can't be set at
the same time");
+ }
+ ctx->group_commit = true;
+ } else {
+ if (ctx->label.empty()) {
+ ctx->label = generate_uuid_string();
+ }
}
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ?
true : false;
@@ -187,7 +201,9 @@ int StreamLoadAction::on_header(HttpRequest* req) {
LOG(INFO) << "new income streaming load request." << ctx->brief() << ",
db=" << ctx->db
<< ", tbl=" << ctx->table;
- auto st = _on_header(req, ctx);
+ if (st.ok()) {
+ st = _on_header(req, ctx);
+ }
if (!st.ok()) {
ctx->status = std::move(st);
if (ctx->need_rollback) {
@@ -287,9 +303,11 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req,
std::shared_ptr<Strea
ctx->load_comment = http_req->header(HTTP_COMMENT);
}
// begin transaction
- int64_t begin_txn_start_time = MonotonicNanos();
- RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
- ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
+ if (!ctx->group_commit) {
+ int64_t begin_txn_start_time = MonotonicNanos();
+
RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
+ ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
+ }
// process put file
return _process_put(http_req, ctx);
@@ -555,6 +573,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE),
"true");
request.__set_memtable_on_sink_node(value);
}
+ request.__set_group_commit(ctx->group_commit);
#ifndef BE_TEST
// plan this load
@@ -582,6 +601,13 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
return Status::OK();
}
+ if (ctx->group_commit) {
+ ctx->db_id = ctx->put_result.db_id;
+ ctx->table_id = ctx->put_result.table_id;
+ ctx->schema_version = ctx->put_result.base_schema_version;
+ return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
+ }
+
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 8e84140707..5a1550f48f 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -65,5 +65,6 @@ static const std::string HTTP_TXN_OPERATION_KEY =
"txn_operation";
static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node";
static const std::string HTTP_WAL_ID_KY = "wal_id";
static const std::string HTTP_AUTH_CODE = "auth_code";
+static const std::string HTTP_GROUP_COMMIT = "group_commit";
} // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 45ccb8d6b4..f039cf34e8 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -35,11 +35,11 @@
#include "runtime/stream_load/stream_load_context.h"
#include "util/thrift_rpc_helper.h"
#include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/sink/group_commit_block_sink.h"
namespace doris {
class TPlan;
-class FragmentExecState;
Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock>
block) {
DCHECK(block->get_schema_version() == schema_version);
@@ -259,7 +259,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
bool prepare_failed,
RuntimeState* state) {
{
std::lock_guard<doris::Mutex> l(_lock);
- if (prepare_failed) {
+ if (prepare_failed || !status.ok()) {
auto it = _load_block_queues.find(instance_id);
if (it != _load_block_queues.end()) {
it->second->cancel(status);
@@ -511,6 +511,130 @@ Status
GroupCommitMgr::_append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
return Status::OK();
}
+Status
GroupCommitMgr::group_commit_stream_load(std::shared_ptr<StreamLoadContext>
ctx) {
+ return _insert_into_thread_pool->submit_func([ctx, this] {
+ Status st = _group_commit_stream_load(ctx);
+ if (!st.ok()) {
+ ctx->promise.set_value(st);
+ }
+ });
+}
+
+Status
GroupCommitMgr::_group_commit_stream_load(std::shared_ptr<StreamLoadContext>
ctx) {
+ auto& fragment_params = ctx->put_result.params;
+ auto& tdesc_tbl = fragment_params.desc_tbl;
+ DCHECK(fragment_params.params.per_node_scan_ranges.size() == 1);
+ DCHECK(fragment_params.params.per_node_scan_ranges.begin()->second.size()
== 1);
+ auto& tscan_range_params =
fragment_params.params.per_node_scan_ranges.begin()->second.at(0);
+ auto& nodes = fragment_params.fragment.plan.nodes;
+ DCHECK(nodes.size() > 0);
+ auto& plan_node = nodes.at(0);
+
+ std::vector<std::shared_ptr<doris::vectorized::FutureBlock>> future_blocks;
+ {
+ std::shared_ptr<LoadBlockQueue> load_block_queue;
+ // 1. FileScanNode consumes data from the pipe.
+ std::unique_ptr<RuntimeState> runtime_state =
RuntimeState::create_unique();
+ TUniqueId load_id;
+ load_id.hi = ctx->id.hi;
+ load_id.lo = ctx->id.lo;
+ TQueryOptions query_options;
+ query_options.query_type = TQueryType::LOAD;
+ TQueryGlobals query_globals;
+ runtime_state->init(load_id, query_options, query_globals, _exec_env);
+
runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>(
+ MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}",
ctx->id.to_string()), -1));
+ DescriptorTbl* desc_tbl = nullptr;
+ RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(),
tdesc_tbl, &desc_tbl));
+ runtime_state->set_desc_tbl(desc_tbl);
+ auto file_scan_node =
+ vectorized::NewFileScanNode(runtime_state->obj_pool(),
plan_node, *desc_tbl);
+ Status status = Status::OK();
+ auto sink = stream_load::GroupCommitBlockSink(
+ runtime_state->obj_pool(), file_scan_node.row_desc(),
+ fragment_params.fragment.output_exprs, &status);
+ std::unique_ptr<int, std::function<void(int*)>>
close_scan_node_func((int*)0x01, [&](int*) {
+ if (load_block_queue != nullptr) {
+ load_block_queue->remove_load_id(load_id);
+ }
+ file_scan_node.close(runtime_state.get());
+ sink.close(runtime_state.get(), status);
+ });
+ RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get()));
+ RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
+ std::vector<TScanRangeParams> params_vector;
+ params_vector.emplace_back(tscan_range_params);
+ file_scan_node.set_scan_ranges(params_vector);
+ RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
+
+ RETURN_IF_ERROR(status);
+ RETURN_IF_ERROR(sink.init(fragment_params.fragment.output_sink));
+ RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink.prepare(runtime_state.get()));
+ RETURN_IF_ERROR(sink.open(runtime_state.get()));
+
+ // 2. Put the block into block queue.
+ std::unique_ptr<doris::vectorized::Block> _block =
+ doris::vectorized::Block::create_unique();
+ bool first = true;
+ bool eof = false;
+ while (!eof) {
+ // TODO what to do if scan one block error
+ RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(),
_block.get(), &eof));
+ RETURN_IF_ERROR(sink.send(runtime_state.get(), _block.get()));
+ std::shared_ptr<doris::vectorized::FutureBlock> future_block =
+ std::make_shared<doris::vectorized::FutureBlock>();
+ future_block->swap(*(_block.get()));
+ future_block->set_info(ctx->schema_version, load_id, first, eof);
+ // TODO what to do if add one block error
+ if (load_block_queue == nullptr) {
+ RETURN_IF_ERROR(_get_first_block_load_queue(ctx->db_id,
ctx->table_id, future_block,
+ load_block_queue));
+ ctx->label = load_block_queue->label;
+ ctx->txn_id = load_block_queue->txn_id;
+ }
+ RETURN_IF_ERROR(load_block_queue->add_block(future_block));
+ if (future_block->rows() > 0) {
+ future_blocks.emplace_back(future_block);
+ }
+ first = false;
+ }
+ ctx->number_unselected_rows =
runtime_state->num_rows_load_unselected();
+ ctx->number_filtered_rows = runtime_state->num_rows_load_filtered();
+ ctx->error_url = runtime_state->get_error_log_file_path();
+ if (!runtime_state->get_error_log_file_path().empty()) {
+ LOG(INFO) << "id=" << print_id(load_id)
+ << ", url=" << runtime_state->get_error_log_file_path()
+ << ", load rows=" << runtime_state->num_rows_load_total()
+ << ", filter rows=" <<
runtime_state->num_rows_load_filtered()
+ << ", unselect rows=" <<
runtime_state->num_rows_load_unselected()
+ << ", success rows=" <<
runtime_state->num_rows_load_success();
+ }
+ }
+
+ int64_t total_rows = 0;
+ int64_t loaded_rows = 0;
+ // 3. wait to wal
+ for (const auto& future_block : future_blocks) {
+ std::unique_lock<doris::Mutex> l(*(future_block->lock));
+ if (!future_block->is_handled()) {
+ future_block->cv->wait(l);
+ }
+ // future_block->get_status()
+ total_rows += future_block->get_total_rows();
+ loaded_rows += future_block->get_loaded_rows();
+ }
+ ctx->number_total_rows = total_rows + ctx->number_unselected_rows +
ctx->number_filtered_rows;
+ ctx->number_loaded_rows = loaded_rows;
+ ctx->number_filtered_rows += total_rows - ctx->number_loaded_rows;
+ ctx->promise.set_value(Status::OK());
+ VLOG_DEBUG << "finish read all block of pipe=" << ctx->id.to_string()
+ << ", total rows=" << ctx->number_total_rows
+ << ", loaded rows=" << ctx->number_loaded_rows
+ << ", filtered rows=" << ctx->number_filtered_rows
+ << ", unselected rows=" << ctx->number_unselected_rows;
+ return Status::OK();
+}
+
Status GroupCommitMgr::_get_first_block_load_queue(
int64_t db_id, int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 1d124009d1..01a0905c40 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -35,6 +35,7 @@ class TUniqueId;
class TExecPlanFragmentParams;
class ObjectPool;
class RuntimeState;
+class StreamLoadContext;
class StreamLoadPipe;
class LoadBlockQueue {
@@ -118,6 +119,9 @@ public:
const PGroupCommitInsertRequest* request,
PGroupCommitInsertResponse* response);
+ // stream load
+ Status group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
+
// used when init group_commit_scan_node
Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue);
@@ -126,6 +130,8 @@ private:
// used by insert into
Status _append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
const PGroupCommitInsertRequest* request);
+ // used by stream load
+ Status _group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
Status _get_first_block_load_queue(int64_t db_id, int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>&
load_block_queue);
diff --git a/be/src/runtime/stream_load/stream_load_context.cpp
b/be/src/runtime/stream_load/stream_load_context.cpp
index f381ba097d..0cd1f0e3d5 100644
--- a/be/src/runtime/stream_load/stream_load_context.cpp
+++ b/be/src/runtime/stream_load/stream_load_context.cpp
@@ -50,9 +50,14 @@ std::string StreamLoadContext::to_json() const {
writer.Key("Comment");
writer.String(load_comment.c_str());
- writer.Key("TwoPhaseCommit");
- std::string need_two_phase_commit = two_phase_commit ? "true" : "false";
- writer.String(need_two_phase_commit.c_str());
+ if (!group_commit) {
+ writer.Key("TwoPhaseCommit");
+ std::string need_two_phase_commit = two_phase_commit ? "true" :
"false";
+ writer.String(need_two_phase_commit.c_str());
+ } else {
+ writer.Key("GroupCommit");
+ writer.Bool(true);
+ }
// status
writer.Key("Status");
@@ -92,16 +97,20 @@ std::string StreamLoadContext::to_json() const {
writer.Int64(receive_bytes);
writer.Key("LoadTimeMs");
writer.Int64(load_cost_millis);
- writer.Key("BeginTxnTimeMs");
- writer.Int64(begin_txn_cost_nanos / 1000000);
+ if (!group_commit) {
+ writer.Key("BeginTxnTimeMs");
+ writer.Int64(begin_txn_cost_nanos / 1000000);
+ }
writer.Key("StreamLoadPutTimeMs");
writer.Int64(stream_load_put_cost_nanos / 1000000);
writer.Key("ReadDataTimeMs");
writer.Int64(read_data_cost_nanos / 1000000);
writer.Key("WriteDataTimeMs");
writer.Int(write_data_cost_nanos / 1000000);
- writer.Key("CommitAndPublishTimeMs");
- writer.Int64(commit_and_publish_txn_cost_nanos / 1000000);
+ if (!group_commit) {
+ writer.Key("CommitAndPublishTimeMs");
+ writer.Int64(commit_and_publish_txn_cost_nanos / 1000000);
+ }
if (!error_url.empty()) {
writer.Key("ErrorURL");
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index 844caea126..ab8cc6be04 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -136,6 +136,8 @@ public:
int64_t db_id = -1;
int64_t wal_id = -1;
std::string table;
+ int64_t table_id = -1;
+ int64_t schema_version = -1;
std::string label;
// optional
std::string sub_label;
@@ -174,6 +176,7 @@ public:
bool use_streaming = false;
TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN;
TFileCompressType::type compress_type = TFileCompressType::UNKNOWN;
+ bool group_commit = false;
std::shared_ptr<MessageBodySink> body_sink;
std::shared_ptr<io::StreamLoadPipe> pipe;
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
new file mode 100644
index 0000000000..08f6d87ade
--- /dev/null
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/group_commit_block_sink.h"
+
+#include "runtime/group_commit_mgr.h"
+#include "runtime/runtime_state.h"
+#include "util/doris_metrics.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/sink/vtablet_finder.h"
+#include "vec/sink/vtablet_sink.h"
+
+namespace doris {
+
+namespace stream_load {
+
+GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const
RowDescriptor& row_desc,
+ const std::vector<TExpr>& texprs,
Status* status)
+ : DataSink(row_desc) {
+ // From the thrift expressions create the real exprs.
+ *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
+ _name = "GroupCommitBlockSink";
+}
+
+GroupCommitBlockSink::~GroupCommitBlockSink() = default;
+
+Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
+ DCHECK(t_sink.__isset.olap_table_sink);
+ auto& table_sink = t_sink.olap_table_sink;
+ _tuple_desc_id = table_sink.tuple_id;
+ _schema.reset(new OlapTableSchemaParam());
+ RETURN_IF_ERROR(_schema->init(table_sink.schema));
+ return Status::OK();
+}
+
+Status GroupCommitBlockSink::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSink::prepare(state));
+ _state = state;
+
+ // profile must add to state's object pool
+ _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink"));
+ _mem_tracker =
+ std::make_shared<MemTracker>("OlapTableSink:" +
std::to_string(state->load_job_id()));
+ SCOPED_TIMER(_profile->total_time_counter());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+
+ // get table's tuple descriptor
+ _output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
+ if (_output_tuple_desc == nullptr) {
+ LOG(WARNING) << "unknown destination tuple descriptor, id=" <<
_tuple_desc_id;
+ return Status::InternalError("unknown destination tuple descriptor");
+ }
+
+ _block_convertor =
std::make_unique<vectorized::OlapTableBlockConvertor>(_output_tuple_desc);
+ _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
+ _state->batch_size());
+ // Prepare the exprs to run.
+ return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
+}
+
+Status GroupCommitBlockSink::open(RuntimeState* state) {
+ // Prepare the exprs to run.
+ return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+}
+
+Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block*
input_block, bool eos) {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ Status status = Status::OK();
+ auto rows = input_block->rows();
+ auto bytes = input_block->bytes();
+ if (UNLIKELY(rows == 0)) {
+ return status;
+ }
+ SCOPED_TIMER(_profile->total_time_counter());
+ // update incrementally so that FE can get the progress.
+ // the real 'num_rows_load_total' will be set when sink being closed.
+ state->update_num_rows_load_total(rows);
+ state->update_num_bytes_load_total(bytes);
+ DorisMetrics::instance()->load_rows->increment(rows);
+ DorisMetrics::instance()->load_bytes->increment(bytes);
+
+ std::shared_ptr<vectorized::Block> block;
+ bool has_filtered_rows = false;
+ RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
+ state, input_block, block, _output_vexpr_ctxs, rows,
has_filtered_rows));
+ block->swap(*input_block);
+ return Status::OK();
+}
+
+} // namespace stream_load
+} // namespace doris
diff --git a/be/src/vec/sink/group_commit_block_sink.h
b/be/src/vec/sink/group_commit_block_sink.h
new file mode 100644
index 0000000000..a309413f5a
--- /dev/null
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include "exec/data_sink.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/vtablet_sink.h"
+
+namespace doris {
+
+class OlapTableSchemaParam;
+class MemTracker;
+
+namespace stream_load {
+
+class GroupCommitBlockSink : public DataSink {
+public:
+ GroupCommitBlockSink(ObjectPool* pool, const RowDescriptor& row_desc,
+ const std::vector<TExpr>& texprs, Status* status);
+
+ ~GroupCommitBlockSink() override;
+
+ Status init(const TDataSink& sink) override;
+
+ Status prepare(RuntimeState* state) override;
+
+ Status open(RuntimeState* state) override;
+
+ Status send(RuntimeState* state, vectorized::Block* block, bool eos =
false) override;
+
+private:
+ vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+
+ int _tuple_desc_id = -1;
+ std::shared_ptr<OlapTableSchemaParam> _schema;
+
+ RuntimeState* _state = nullptr;
+ std::shared_ptr<MemTracker> _mem_tracker;
+ // this is tuple descriptor of destination OLAP table
+ TupleDescriptor* _output_tuple_desc = nullptr;
+ std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor;
+};
+
+} // namespace stream_load
+} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index ad1a2f1e52..1fd75c304c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -164,7 +164,7 @@ public class NativeInsertStmt extends InsertStmt {
private ByteString rangeBytes = null;
private long tableId = -1;
// true if be generates an insert from group commit tvf stmt and executes
to load data
- public boolean isInnerGroupCommit = false;
+ public boolean isGroupCommitTvf = false;
private boolean isFromDeleteOrUpdateStmt = false;
@@ -895,7 +895,7 @@ public class NativeInsertStmt extends InsertStmt {
}
if (targetTable instanceof OlapTable) {
checkInnerGroupCommit();
- OlapTableSink sink = isInnerGroupCommit ? new
GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple,
+ OlapTableSink sink = isGroupCommitTvf ? new
GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple,
targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert())
: new OlapTableSink((OlapTable) targetTable, olapTuple,
targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
@@ -933,10 +933,11 @@ public class NativeInsertStmt extends InsertStmt {
private void checkInnerGroupCommit() {
List<TableRef> tableRefs = new ArrayList<>();
queryStmt.collectTableRefs(tableRefs);
- if (tableRefs.size() == 1 && tableRefs.get(0) instanceof
TableValuedFunctionRef
- && ((TableValuedFunctionRef) tableRefs.get(
- 0)).getTableFunction() instanceof
GroupCommitTableValuedFunction) {
- isInnerGroupCommit = true;
+ if (tableRefs.size() == 1 && tableRefs.get(0) instanceof
TableValuedFunctionRef) {
+ TableValuedFunctionRef tvfRef = (TableValuedFunctionRef)
tableRefs.get(0);
+ if (tvfRef.getTableFunction() instanceof
GroupCommitTableValuedFunction) {
+ isGroupCommitTvf = true;
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 304cbe6ab5..08f624c99c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1902,7 +1902,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
if (Config.enable_pipeline_load) {
result.setPipelineParams(pipelineStreamLoadPutImpl(request));
} else {
- result.setParams(streamLoadPutImpl(request));
+ result.setParams(streamLoadPutImpl(request, result));
}
}
} catch (UserException e) {
@@ -2060,6 +2060,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
NativeInsertStmt parsedStmt = (NativeInsertStmt)
SqlParserUtils.getFirstStmt(parser);
parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0));
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
+ if (request.isGroupCommit() && parsedStmt.getLabel() != null) {
+ throw new AnalysisException("label and group_commit can't be
set at the same time");
+ }
StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
ctx.setExecutor(executor);
TQueryOptions tQueryOptions = ctx.getSessionVariable().toThrift();
@@ -2077,10 +2080,12 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.getParams().setTableName(parsedStmt.getTbl());
// The txn_id here is obtained from the NativeInsertStmt
result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id));
- if (parsedStmt.isInnerGroupCommit) {
- result.setBaseSchemaVersion(((OlapTable)
parsedStmt.getTargetTable()).getBaseSchemaVersion());
+ if (parsedStmt.isGroupCommitTvf) {
result.getParams().params.setGroupCommit(true);
}
+ result.setDbId(parsedStmt.getTargetTable().getDatabase().getId());
+ result.setTableId(parsedStmt.getTargetTable().getId());
+ result.setBaseSchemaVersion(((OlapTable)
parsedStmt.getTargetTable()).getBaseSchemaVersion());
} catch (UserException e) {
LOG.warn("exec sql error", e);
throw new UserException("exec sql error" + e);
@@ -2090,7 +2095,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
}
- private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest
request) throws UserException {
+ private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest
request, TStreamLoadPutResult result)
+ throws UserException {
String cluster = request.getCluster();
if (Strings.isNullOrEmpty(cluster)) {
cluster = SystemInfoService.DEFAULT_CLUSTER;
@@ -2108,6 +2114,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
long timeoutMs = request.isSetThriftRpcTimeoutMs() ?
request.getThriftRpcTimeoutMs() : 5000;
Table table = db.getTableOrMetaException(request.getTbl(),
TableType.OLAP);
+ result.setDbId(db.getId());
+ result.setTableId(table.getId());
+ result.setBaseSchemaVersion(((OlapTable)
table).getBaseSchemaVersion());
return generatePlanFragmentParams(request, db, fullDbName, (OlapTable)
table, timeoutMs);
}
@@ -2133,13 +2142,15 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
StreamLoadPlanner planner = new StreamLoadPlanner(db, table,
streamLoadTask);
TExecPlanFragmentParams plan =
planner.plan(streamLoadTask.getId(), multiTableFragmentInstanceIdIndex);
- // add table indexes to transaction state
- TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
- .getTransactionState(db.getId(), request.getTxnId());
- if (txnState == null) {
- throw new UserException("txn does not exist: " +
request.getTxnId());
+ if (!request.isGroupCommit()) {
+ // add table indexes to transaction state
+ TransactionState txnState =
Env.getCurrentGlobalTransactionMgr()
+ .getTransactionState(db.getId(), request.getTxnId());
+ if (txnState == null) {
+ throw new UserException("txn does not exist: " +
request.getTxnId());
+ }
+ txnState.addTableIndexes(table);
}
- txnState.addTableIndexes(table);
plan.setTableName(table.getName());
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
return plan;
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index b1ccf7db08..901acca70d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -639,6 +639,7 @@ struct TStreamLoadPutRequest {
// only valid when file type is CSV
52: optional i8 escape
53: optional bool memtable_on_sink_node;
+ 54: optional bool group_commit
}
struct TStreamLoadPutResult {
@@ -648,6 +649,8 @@ struct TStreamLoadPutResult {
3: optional PaloInternalService.TPipelineFragmentParams pipeline_params
// used for group commit
4: optional i64 base_schema_version
+ 5: optional i64 db_id
+ 6: optional i64 table_id
}
struct TStreamLoadMultiTablePutResult {
diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.bz2
b/regression-test/data/load_p0/http_stream/test_compress.csv.bz2
new file mode 100644
index 0000000000..b2fd1fcfbe
Binary files /dev/null and
b/regression-test/data/load_p0/http_stream/test_compress.csv.bz2 differ
diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.gz
b/regression-test/data/load_p0/http_stream/test_compress.csv.gz
new file mode 100644
index 0000000000..a330a2b071
Binary files /dev/null and
b/regression-test/data/load_p0/http_stream/test_compress.csv.gz differ
diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.lz4
b/regression-test/data/load_p0/http_stream/test_compress.csv.lz4
new file mode 100644
index 0000000000..76955306d8
Binary files /dev/null and
b/regression-test/data/load_p0/http_stream/test_compress.csv.lz4 differ
diff --git
a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
new file mode 100644
index 0000000000..b45fc6f714
--- /dev/null
+++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+5 e -1
+5 e 50
+6 f -1
+6 f -1
+6 f 60
+7 e 70
+8 f 80
+
diff --git a/regression-test/data/load_p0/http_stream/test_stream_load1.csv
b/regression-test/data/load_p0/http_stream/test_stream_load1.csv
new file mode 100644
index 0000000000..b86bcfa176
--- /dev/null
+++ b/regression-test/data/load_p0/http_stream/test_stream_load1.csv
@@ -0,0 +1,2 @@
+5,e
+6,f
diff --git a/regression-test/data/load_p0/http_stream/test_stream_load2.csv
b/regression-test/data/load_p0/http_stream/test_stream_load2.csv
new file mode 100644
index 0000000000..93157e3d6d
--- /dev/null
+++ b/regression-test/data/load_p0/http_stream/test_stream_load2.csv
@@ -0,0 +1,2 @@
+70|7|e
+80|8|f
diff --git a/regression-test/data/load_p0/http_stream/test_stream_load3.csv
b/regression-test/data/load_p0/http_stream/test_stream_load3.csv
new file mode 100644
index 0000000000..f257be1566
--- /dev/null
+++ b/regression-test/data/load_p0/http_stream/test_stream_load3.csv
@@ -0,0 +1,6 @@
+10,a,10
+11,a,11
+1a,a,11
+12,a
+20,b,21
+101,a,101
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.bz2
b/regression-test/data/load_p0/stream_load/test_compress.csv.bz2
new file mode 100644
index 0000000000..b2fd1fcfbe
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/test_compress.csv.bz2 differ
diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.gz
b/regression-test/data/load_p0/stream_load/test_compress.csv.gz
new file mode 100644
index 0000000000..a330a2b071
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/test_compress.csv.gz differ
diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.lz4
b/regression-test/data/load_p0/stream_load/test_compress.csv.lz4
new file mode 100644
index 0000000000..76955306d8
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/test_compress.csv.lz4 differ
diff --git
a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
new file mode 100644
index 0000000000..246be06453
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
@@ -0,0 +1,27 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 a 10
+1 a 10
+1 a 10
+2 b 20
+2 b 20
+2 b 20
+3 c 30
+3 c 30
+3 c 30
+4 d \N
+4 d \N
+4 d \N
+5 e -1
+5 e 50
+6 f -1
+6 f -1
+6 f 60
+7 e 70
+8 f 80
+10 a 10
+11 a 11
+
+-- !sql --
+2402288
+
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load1.csv
b/regression-test/data/load_p0/stream_load/test_stream_load1.csv
new file mode 100644
index 0000000000..b86bcfa176
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load1.csv
@@ -0,0 +1,2 @@
+5,e
+6,f
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load2.csv
b/regression-test/data/load_p0/stream_load/test_stream_load2.csv
new file mode 100644
index 0000000000..93157e3d6d
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load2.csv
@@ -0,0 +1,2 @@
+70|7|e
+80|8|f
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load3.csv
b/regression-test/data/load_p0/stream_load/test_stream_load3.csv
new file mode 100644
index 0000000000..f257be1566
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load3.csv
@@ -0,0 +1,6 @@
+10,a,10
+11,a,11
+1a,a,11
+12,a
+20,b,21
+101,a,101
\ No newline at end of file
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index 2dc4a0c8fa..0fe263f291 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -139,6 +139,10 @@ class StreamLoadAction implements SuiteAction {
headers.put(key, value)
}
+ void unset(String key) {
+ headers.remove(key)
+ }
+
@Override
void run() {
String responseText = null
diff --git
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
new file mode 100644
index 0000000000..a27963742d
--- /dev/null
+++
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -0,0 +1,328 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_http_stream") {
+ def db = "regression_test_load_p0_http_stream"
+ def tableName = "test_group_commit_http_stream"
+
+ def getRowCount = { expectedRowCount ->
+ def retry = 0
+ while (retry < 10) {
+ sleep(2000)
+ def rowCount = sql "select count(*) from ${tableName}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ retry++
+ }
+ }
+
+ def getAlterTableState = {
+ def retry = 0
+ while (true) {
+ sleep(8000)
+ def state = sql "show alter table column where tablename =
'${tableName}' order by CreateTime desc "
+ logger.info("alter table retry: ${retry}, state: ${state}")
+ if (state.size() > 0 && state[0][9] == "FINISHED") {
+ return true
+ }
+ retry++
+ if (retry >= 40) {
+ return false
+ }
+ }
+ return false
+ }
+
+ try {
+ // create table
+ sql """ drop table if exists ${tableName}; """
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(1100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ PARTITION BY RANGE (id) (
+ PARTITION plessThan1 VALUES LESS THAN ("0"),
+ PARTITION plessThan2 VALUES LESS THAN ("100")
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ // stream load with compress file
+ String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/
"lz4"} //, "deflate"}
+ /*for (final def compressionType in compressionTypes) {
+ def fileName = "test_compress.csv." + compressionType
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${tableName} select * from http_stream
+ ("format"="csv", "compress_type"="${compressionType}",
"column_separator"=",")
+ """
+ set 'group_commit', 'true'
+ file "${fileName}"
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+ }
+ }*/
+
+ // stream load with 2 columns
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${tableName}(id, name) select c1, c2
from http_stream
+ ("format"="csv", "column_separator"=",")
+ """
+
+ set 'group_commit', 'true'
+ file "test_stream_load1.csv"
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+ }
+
+ // stream load with different column order
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${tableName}(score, id, name) select c1,
c2, c3 from http_stream
+ ("format"="csv", "column_separator"="|")
+ """
+
+ set 'group_commit', 'true'
+ file "test_stream_load2.csv"
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+ }
+
+ // stream load with where condition
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${tableName}(id, name) select c1, c2
from http_stream
+ ("format"="csv", "column_separator"=",") where c1 > 5
+ """
+
+ set 'group_commit', 'true'
+ file "test_stream_load1.csv"
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ // assertEquals(2, json.NumberTotalRows)
+ assertEquals(1, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ // assertEquals(1, json.NumberUnselectedRows)
+ }
+ }
+
+ // stream load with mapping
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${tableName} select c1, c2, c1 * 10 from
http_stream
+ ("format"="csv", "column_separator"=",")
+ """
+
+ set 'group_commit', 'true'
+ file "test_stream_load1.csv"
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+ }
+
+ // stream load with filtered rows
+ /*streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${tableName} select c1, c2, c3 from
http_stream where c2 = 'a'
+ ("format"="csv", "column_separator"=",")
+ """
+
+ set 'group_commit', 'true'
+ file "test_stream_load3.csv"
+ set 'max_filter_ratio', '0.7'
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertEquals(6, json.NumberTotalRows)
+ assertEquals(2, json.NumberLoadedRows)
+ assertEquals(3, json.NumberFilteredRows)
+ assertEquals(1, json.NumberUnselectedRows)
+ assertFalse(json.ErrorURL.isEmpty())
+ }
+ }*/
+
+ // stream load with label
+ streamLoad {
+ set 'version', '1'
+ def label = 'l_' + System.currentTimeMillis()
+ set 'sql', """
+ insert into ${db}.${tableName} with label ${label} select
* from http_stream
+ ("format"="csv", "column_separator"="|")
+ """
+
+ set 'group_commit', 'true'
+ file "test_stream_load2.csv"
+
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ }
+ }
+
+ getRowCount(7)
+ qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
+ } finally {
+ // try_sql("DROP TABLE ${tableName}")
+ }
+
+ // stream load with large data and schema change
+ /*tableName = "test_stream_load_lineorder"
+ try {
+ sql """ DROP TABLE IF EXISTS `${tableName}` """
+ sql """
+ CREATE TABLE IF NOT EXISTS `${tableName}` (
+ `lo_orderkey` bigint(20) NOT NULL COMMENT "",
+ `lo_linenumber` bigint(20) NOT NULL COMMENT "",
+ `lo_custkey` int(11) NOT NULL COMMENT "",
+ `lo_partkey` int(11) NOT NULL COMMENT "",
+ `lo_suppkey` int(11) NOT NULL COMMENT "",
+ `lo_orderdate` int(11) NOT NULL COMMENT "",
+ `lo_orderpriority` varchar(16) NOT NULL COMMENT "",
+ `lo_shippriority` int(11) NOT NULL COMMENT "",
+ `lo_quantity` bigint(20) NOT NULL COMMENT "",
+ `lo_extendedprice` bigint(20) NOT NULL COMMENT "",
+ `lo_ordtotalprice` bigint(20) NOT NULL COMMENT "",
+ `lo_discount` bigint(20) NOT NULL COMMENT "",
+ `lo_revenue` bigint(20) NOT NULL COMMENT "",
+ `lo_supplycost` bigint(20) NOT NULL COMMENT "",
+ `lo_tax` bigint(20) NOT NULL COMMENT "",
+ `lo_commitdate` bigint(20) NOT NULL COMMENT "",
+ `lo_shipmode` varchar(11) NOT NULL COMMENT ""
+ )
+ PARTITION BY RANGE(`lo_orderdate`)
+ (PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
+ PARTITION p1993 VALUES [("19930101"), ("19940101")),
+ PARTITION p1994 VALUES [("19940101"), ("19950101")),
+ PARTITION p1995 VALUES [("19950101"), ("19960101")),
+ PARTITION p1996 VALUES [("19960101"), ("19970101")),
+ PARTITION p1997 VALUES [("19970101"), ("19980101")),
+ PARTITION p1998 VALUES [("19980101"), ("19990101")))
+ DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 4
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ // load data
+ def columns =
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
+
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
+ lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode"""
+
+ new Thread(() -> {
+ Thread.sleep(3000)
+ // do light weight schema change
+ sql """ alter table ${tableName} ADD column sc_tmp varchar(100)
after lo_revenue; """
+
+ assertTrue(getAlterTableState())
+
+ // do hard weight schema change
+ def new_columns =
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
+
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
+ lo_revenue,lo_supplycost,lo_tax,lo_shipmode,lo_commitdate"""
+ sql """ alter table ${tableName} order by (${new_columns}); """
+ }).start();
+
+ for (int i = 0; i < 4; i++) {
+
+ streamLoad {
+ set 'version', '1'
+ sql """
+ insert into ${db}.${table} ($columns)
+ select
c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17 from http_stream
+ ("format"="csv", "compress_type"="GZ",
"column_separator"="|")
+ """
+ table tableName
+
+ // set 'column_separator', '|'
+ // set 'compress_type', 'GZ'
+ set 'columns', columns + ",lo_dummy"
+ set 'group_commit', 'true'
+ unset 'label'
+
+ file """${getS3Url()}/regression/ssb/sf0.1/lineorder.tbl.gz"""
+
+ time 10000 // limit inflight 10s
+
+ // stream load action will check result, include Success
status, and NumberTotalRows == NumberLoadedRows
+
+ // if declared a check callback, the default check condition
will ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load ${i}, result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertEquals(json.NumberLoadedRows, 600572)
+ assertTrue(json.LoadBytes > 0)
+ assertTrue(json.GroupCommit)
+ }
+ }
+ }
+
+ getRowCount(2402288)
+ qt_sql """ select count(*) from ${tableName} """
+
+ assertTrue(getAlterTableState())
+ } finally {
+ // try_sql("DROP TABLE ${tableName}")
+ }*/
+}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
new file mode 100644
index 0000000000..9e50aebf64
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_stream_load") {
+ def tableName = "test_group_commit_stream_load"
+
+ def getRowCount = { expectedRowCount ->
+ def retry = 0
+ while (retry < 10) {
+ sleep(2000)
+ def rowCount = sql "select count(*) from ${tableName}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ retry++
+ }
+ }
+
+ def getAlterTableState = {
+ def retry = 0
+ while (true) {
+ sleep(8000)
+ def state = sql "show alter table column where tablename =
'${tableName}' order by CreateTime desc "
+ logger.info("alter table retry: ${retry}, state: ${state}")
+ if (state.size() > 0 && state[0][9] == "FINISHED") {
+ return true
+ }
+ retry++
+ if (retry >= 40) {
+ return false
+ }
+ }
+ return false
+ }
+
+ try {
+ // create table
+ sql """ drop table if exists ${tableName}; """
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(1100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ PARTITION BY RANGE (id) (
+ PARTITION plessThan1 VALUES LESS THAN ("0"),
+ PARTITION plessThan2 VALUES LESS THAN ("100")
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ // stream load with compress file
+ String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/
"lz4"} //, "deflate"}
+ for (final def compressionType in compressionTypes) {
+ def fileName = "test_compress.csv." + compressionType
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'group_commit', 'true'
+ set 'compress_type', "${compressionType}"
+ // set 'columns', 'id, name, score'
+ file "${fileName}"
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ // stream load with 2 columns
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'group_commit', 'true'
+ set 'columns', 'id, name'
+ file "test_stream_load1.csv"
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+ }
+
+ // stream load with different column order
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', '|'
+ set 'group_commit', 'true'
+ set 'columns', 'score, id, name'
+ file "test_stream_load2.csv"
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+ }
+
+ // stream load with where condition
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'group_commit', 'true'
+ set 'columns', 'id, name'
+ file "test_stream_load1.csv"
+ set 'where', 'id > 5'
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(1, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(1, json.NumberUnselectedRows)
+ }
+ }
+
+ // stream load with mapping
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'group_commit', 'true'
+ set 'columns', 'id, name, score = id * 10'
+ file "test_stream_load1.csv"
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+ }
+
+ // stream load with filtered rows
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'group_commit', 'true'
+ file "test_stream_load3.csv"
+ set 'where', "name = 'a'"
+ set 'max_filter_ratio', '0.7'
+ unset 'label'
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertEquals(6, json.NumberTotalRows)
+ assertEquals(2, json.NumberLoadedRows)
+ assertEquals(3, json.NumberFilteredRows)
+ assertEquals(1, json.NumberUnselectedRows)
+ assertFalse(json.ErrorURL.isEmpty())
+ }
+ }
+
+ // stream load with label
+ streamLoad {
+ table "${tableName}"
+
+ // set 'label', 'test_stream_load'
+ set 'column_separator', '|'
+ set 'group_commit', 'true'
+ // set 'label', 'l_' + System.currentTimeMillis()
+ file "test_stream_load2.csv"
+
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ }
+ }
+
+ getRowCount(21)
+ qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
+ } finally {
+ // try_sql("DROP TABLE ${tableName}")
+ }
+
+ // stream load with large data and schema change
+ tableName = "test_stream_load_lineorder"
+ try {
+ sql """ DROP TABLE IF EXISTS `${tableName}` """
+ sql """
+ CREATE TABLE IF NOT EXISTS `${tableName}` (
+ `lo_orderkey` bigint(20) NOT NULL COMMENT "",
+ `lo_linenumber` bigint(20) NOT NULL COMMENT "",
+ `lo_custkey` int(11) NOT NULL COMMENT "",
+ `lo_partkey` int(11) NOT NULL COMMENT "",
+ `lo_suppkey` int(11) NOT NULL COMMENT "",
+ `lo_orderdate` int(11) NOT NULL COMMENT "",
+ `lo_orderpriority` varchar(16) NOT NULL COMMENT "",
+ `lo_shippriority` int(11) NOT NULL COMMENT "",
+ `lo_quantity` bigint(20) NOT NULL COMMENT "",
+ `lo_extendedprice` bigint(20) NOT NULL COMMENT "",
+ `lo_ordtotalprice` bigint(20) NOT NULL COMMENT "",
+ `lo_discount` bigint(20) NOT NULL COMMENT "",
+ `lo_revenue` bigint(20) NOT NULL COMMENT "",
+ `lo_supplycost` bigint(20) NOT NULL COMMENT "",
+ `lo_tax` bigint(20) NOT NULL COMMENT "",
+ `lo_commitdate` bigint(20) NOT NULL COMMENT "",
+ `lo_shipmode` varchar(11) NOT NULL COMMENT ""
+ )
+ PARTITION BY RANGE(`lo_orderdate`)
+ (PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
+ PARTITION p1993 VALUES [("19930101"), ("19940101")),
+ PARTITION p1994 VALUES [("19940101"), ("19950101")),
+ PARTITION p1995 VALUES [("19950101"), ("19960101")),
+ PARTITION p1996 VALUES [("19960101"), ("19970101")),
+ PARTITION p1997 VALUES [("19970101"), ("19980101")),
+ PARTITION p1998 VALUES [("19980101"), ("19990101")))
+ DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 4
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ // load data
+ def columns =
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
+
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
+ lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode"""
+
+ new Thread(() -> {
+ Thread.sleep(3000)
+ // do light weight schema change
+ sql """ alter table ${tableName} ADD column sc_tmp varchar(100)
after lo_revenue; """
+
+ assertTrue(getAlterTableState())
+
+ // do hard weight schema change
+ def new_columns =
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
+
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
+ lo_revenue,lo_supplycost,lo_tax,lo_shipmode,lo_commitdate"""
+ sql """ alter table ${tableName} order by (${new_columns}); """
+ }).start();
+
+ for (int i = 0; i < 4; i++) {
+
+ streamLoad {
+ table tableName
+
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+ set 'columns', columns + ",lo_dummy"
+ set 'group_commit', 'true'
+ unset 'label'
+
+ file """${getS3Url()}/regression/ssb/sf0.1/lineorder.tbl.gz"""
+
+ time 10000 // limit inflight 10s
+
+ // stream load action will check result, include Success
status, and NumberTotalRows == NumberLoadedRows
+
+ // if declared a check callback, the default check condition
will ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load ${i}, result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertEquals(json.NumberLoadedRows, 600572)
+ assertTrue(json.LoadBytes > 0)
+ assertTrue(json.GroupCommit)
+ }
+ }
+ }
+
+ getRowCount(2402288)
+ qt_sql """ select count(*) from ${tableName} """
+
+ assertTrue(getAlterTableState())
+ } finally {
+ // try_sql("DROP TABLE ${tableName}")
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]