This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 45b2dbab6af [improve](group commit) Group commit support max filter ratio when rows is less than value in config (#28139) 45b2dbab6af is described below commit 45b2dbab6afa093f3402ac919929f97ff28f5c9a Author: meiyi <myime...@gmail.com> AuthorDate: Tue Dec 12 16:33:36 2023 +0800 [improve](group commit) Group commit support max filter ratio when rows is less than value in config (#28139) --- be/src/common/config.cpp | 3 +- be/src/common/config.h | 3 +- be/src/runtime/group_commit_mgr.cpp | 6 +- be/src/runtime/group_commit_mgr.h | 2 - .../runtime/stream_load/stream_load_executor.cpp | 9 + be/src/vec/sink/group_commit_block_sink.cpp | 61 +++- be/src/vec/sink/group_commit_block_sink.h | 5 + .../apache/doris/analysis/NativeInsertStmt.java | 2 +- .../apache/doris/planner/GroupCommitBlockSink.java | 5 +- .../apache/doris/planner/GroupCommitPlanner.java | 5 +- .../apache/doris/planner/StreamLoadPlanner.java | 6 +- .../java/org/apache/doris/qe/StmtExecutor.java | 5 +- gensrc/thrift/DataSinks.thrift | 1 + .../insert_group_commit_into_max_filter_ratio.out | 34 +++ .../data/insert_p0/test_group_commit_10.csv | 4 + .../data/insert_p0/test_group_commit_11.csv.gz | Bin 0 -> 35223 bytes .../http_stream/test_group_commit_http_stream.out | 3 - ...nsert_group_commit_into_max_filter_ratio.groovy | 339 +++++++++++++++++++++ .../insert_group_commit_with_prepare_stmt.groovy | 2 + .../test_group_commit_http_stream.groovy | 14 +- .../test_group_commit_stream_load.groovy | 2 +- 21 files changed, 475 insertions(+), 36 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 549f6a0db0d..c97959f2e4a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1100,10 +1100,11 @@ DEFINE_Int16(bitmap_serialize_version, "1"); DEFINE_String(group_commit_replay_wal_dir, "./wal"); DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); -DEFINE_Bool(wait_internal_group_commit_finish, "false"); // the count of thread to group commit insert DEFINE_Int32(group_commit_insert_threads, "10"); +DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000"); +DEFINE_Bool(wait_internal_group_commit_finish, "false"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400"); diff --git a/be/src/common/config.h b/be/src/common/config.h index bad4724ac91..e3dbe3234c4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1173,10 +1173,11 @@ DECLARE_Int16(bitmap_serialize_version); DECLARE_String(group_commit_replay_wal_dir); DECLARE_Int32(group_commit_replay_wal_retry_num); DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds); -DECLARE_Bool(wait_internal_group_commit_finish); // This config can be set to limit thread number in group commit insert thread pool. DECLARE_mInt32(group_commit_insert_threads); +DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio); +DECLARE_Bool(wait_internal_group_commit_finish); // The configuration item is used to lower the priority of the scanner thread, // typically employed to ensure CPU scheduling for write operations. diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 224d446b661..b97d5de8a54 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -133,8 +133,7 @@ void LoadBlockQueue::cancel(const Status& st) { Status GroupCommitTable::get_first_block_load_queue( int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr<vectorized::Block> block, std::shared_ptr<LoadBlockQueue>& load_block_queue, - int be_exe_version) { + std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) { DCHECK(table_id == _table_id); { std::unique_lock l(_lock); @@ -425,7 +424,6 @@ void GroupCommitMgr::stop() { Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr<vectorized::Block> block, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) { std::shared_ptr<GroupCommitTable> group_commit_table; @@ -439,7 +437,7 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_i group_commit_table = _table_map[table_id]; } return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id, - block, load_block_queue, be_exe_version); + load_block_queue, be_exe_version); } Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 90b0e7a040f..44826cd9465 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -100,7 +100,6 @@ public: _all_block_queues_bytes(all_block_queue_bytes) {}; Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr<vectorized::Block> block, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version); Status get_load_block_queue(const TUniqueId& instance_id, @@ -142,7 +141,6 @@ public: std::shared_ptr<LoadBlockQueue>& load_block_queue); Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr<vectorized::Block> block, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version); diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 50bb2407564..1fc8eb81207 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -101,6 +101,15 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte ctx->number_loaded_rows); } } else { + if (ctx->group_commit && status->is<DATA_QUALITY_ERROR>()) { + ctx->number_total_rows = state->num_rows_load_total(); + ctx->number_loaded_rows = state->num_rows_load_success(); + ctx->number_filtered_rows = state->num_rows_load_filtered(); + ctx->number_unselected_rows = state->num_rows_load_unselected(); + if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) { + ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); + } + } LOG(WARNING) << "fragment execute failed" << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) << ", err_msg=" << status->to_string() << ", " << ctx->brief(); diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 2d40f94a543..bb5c5c70d0c 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -49,6 +49,7 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { _base_schema_version = table_sink.base_schema_version; _group_commit_mode = table_sink.group_commit_mode; _load_id = table_sink.load_id; + _max_filter_ratio = table_sink.max_filter_ratio; return Status::OK(); } @@ -84,18 +85,28 @@ Status GroupCommitBlockSink::open(RuntimeState* state) { } Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { - if (_load_block_queue) { - _load_block_queue->remove_load_id(_load_id); - } RETURN_IF_ERROR(DataSink::close(state, close_status)); RETURN_IF_ERROR(close_status); - // wait to wal int64_t total_rows = state->num_rows_load_total(); int64_t loaded_rows = state->num_rows_load_total(); - state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows - - loaded_rows); state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() + state->num_rows_load_filtered()); + state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows - + loaded_rows); + if (!_is_block_appended) { + // if not meet the max_filter_ratio, we should return error status directly + int64_t num_selected_rows = + state->num_rows_load_total() - state->num_rows_load_unselected(); + if (num_selected_rows > 0 && + (double)state->num_rows_load_filtered() / num_selected_rows > _max_filter_ratio) { + return Status::DataQualityError("too many filtered rows"); + } + RETURN_IF_ERROR(_add_blocks()); + } + if (_load_block_queue) { + _load_block_queue->remove_load_id(_load_id); + } + // wait to wal auto st = Status::OK(); if (_load_block_queue && (_load_block_queue->wait_internal_group_commit_finish || _group_commit_mode == TGroupCommitMode::SYNC_MODE)) { @@ -148,6 +159,8 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, if (block->rows() == 0) { return Status::OK(); } + // the insert group commit tvf always accept nullable columns, so we should convert + // the non-nullable columns to nullable columns for (int i = 0; i < block->columns(); ++i) { if (block->get_by_position(i).type->is_nullable()) { continue; @@ -166,22 +179,42 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, } std::shared_ptr<vectorized::Block> output_block = vectorized::Block::create_shared(); output_block->swap(cur_mutable_block->to_block()); + if (!_is_block_appended && state->num_rows_load_total() + state->num_rows_load_unselected() + + state->num_rows_load_filtered() <= + config::group_commit_memory_rows_for_max_filter_ratio) { + _blocks.emplace_back(output_block); + } else { + if (!_is_block_appended) { + RETURN_IF_ERROR(_add_blocks()); + } + RETURN_IF_ERROR(_load_block_queue->add_block( + output_block, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); + } + return Status::OK(); +} + +Status GroupCommitBlockSink::_add_blocks() { + DCHECK(_is_block_appended == false); TUniqueId load_id; load_id.__set_hi(_load_id.hi); load_id.__set_lo(_load_id.lo); if (_load_block_queue == nullptr) { - if (state->exec_env()->wal_mgr()->is_running()) { - RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( - _db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue, - state->be_exec_version())); - state->set_import_label(_load_block_queue->label); - state->set_wal_id(_load_block_queue->txn_id); + if (_state->exec_env()->wal_mgr()->is_running()) { + RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( + _db_id, _table_id, _base_schema_version, load_id, _load_block_queue, + _state->be_exec_version())); + _state->set_import_label(_load_block_queue->label); + _state->set_wal_id(_load_block_queue->txn_id); } else { return Status::InternalError("be is stopping"); } } - RETURN_IF_ERROR(_load_block_queue->add_block( - output_block, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); + for (auto it = _blocks.begin(); it != _blocks.end(); ++it) { + RETURN_IF_ERROR(_load_block_queue->add_block( + *it, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); + } + _is_block_appended = true; + _blocks.clear(); return Status::OK(); } diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index 4be5a5514cb..2ae37be368a 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -47,6 +47,7 @@ public: private: Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> block); + Status _add_blocks(); vectorized::VExprContextSPtrs _output_vexpr_ctxs; @@ -65,6 +66,10 @@ private: TGroupCommitMode::type _group_commit_mode; UniqueId _load_id; std::shared_ptr<LoadBlockQueue> _load_block_queue; + // used to calculate if meet the max filter ratio + std::vector<std::shared_ptr<vectorized::Block>> _blocks; + bool _is_block_appended = false; + double _max_filter_ratio = 0.0; }; } // namespace vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index dec20ab480e..f523e3e594f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -970,7 +970,7 @@ public class NativeInsertStmt extends InsertStmt { if (isGroupCommitStreamLoadSql) { sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple, targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(), - ConnectContext.get().getSessionVariable().getGroupCommit()); + ConnectContext.get().getSessionVariable().getGroupCommit(), 0); } else { sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java index 14ecda1a5b2..8030dcf4e5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java @@ -29,11 +29,13 @@ import java.util.List; public class GroupCommitBlockSink extends OlapTableSink { private String groupCommit; + private double maxFilterRatio; public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds, - boolean singleReplicaLoad, String groupCommit) { + boolean singleReplicaLoad, String groupCommit, double maxFilterRatio) { super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad); this.groupCommit = groupCommit; + this.maxFilterRatio = maxFilterRatio; } protected TDataSinkType getDataSinkType() { @@ -45,6 +47,7 @@ public class GroupCommitBlockSink extends OlapTableSink { TGroupCommitMode groupCommitMode = parseGroupCommit(groupCommit); Preconditions.checkNotNull(groupCommitMode, "Group commit is: " + groupCommit); tDataSink.olap_table_sink.setGroupCommitMode(groupCommitMode); + tDataSink.olap_table_sink.setMaxFilterRatio(maxFilterRatio); return tDataSink; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index b3557bd5634..bc99d771d91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -98,11 +98,12 @@ public class GroupCommitPlanner { } streamLoadPutRequest .setDb(db.getFullName()) - .setMaxFilterRatio(1) + .setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict ? 0 : 1) .setTbl(table.getName()) .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId) - .setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit); + .setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit) + .setStrictMode(ConnectContext.get().getSessionVariable().enableInsertStrict); StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest); StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); // Will using load id as query id in fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 65fe2163695..bf3261d3944 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -261,7 +261,8 @@ public class StreamLoadPlanner { OlapTableSink olapTableSink; if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) { olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit()); + Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(), + taskInfo.getMaxFilterRatio()); } else { olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); } @@ -481,7 +482,8 @@ public class StreamLoadPlanner { OlapTableSink olapTableSink; if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) { olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit()); + Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(), + taskInfo.getMaxFilterRatio()); } else { olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 463b54771bb..0aabb8d7f34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -176,6 +176,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; +import com.google.protobuf.ProtocolStringList; import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -1891,7 +1892,9 @@ public class StmtExecutor { List<InternalService.PDataRow> rows = groupCommitPlanner.getRows(nativeInsertStmt); PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(context, rows); TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); - if (code == TStatusCode.DATA_QUALITY_ERROR) { + ProtocolStringList errorMsgsList = response.getStatus().getErrorMsgsList(); + if (code == TStatusCode.DATA_QUALITY_ERROR && !errorMsgsList.isEmpty() && errorMsgsList.get(0) + .contains("schema version not match")) { LOG.info("group commit insert failed. stmt: {}, backend id: {}, status: {}, " + "schema version: {}, retry: {}", insertStmt.getOrigStmt().originStmt, groupCommitPlanner.getBackend().getId(), diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 40c0e760e85..daf3bcc06a9 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -266,6 +266,7 @@ struct TOlapTableSink { // used by GroupCommitBlockSink 21: optional i64 base_schema_version 22: optional TGroupCommitMode group_commit_mode + 23: optional double max_filter_ratio } struct TDataSink { diff --git a/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out b/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out new file mode 100644 index 00000000000..62743feeb6c --- /dev/null +++ b/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out @@ -0,0 +1,34 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 a 10 +2 \N -1 +3 a 10 +9 a \N + +-- !sql -- +1 a 10 +2 \N -1 +3 a 10 +6 a \N +7 a \N +9 a \N + +-- !sql -- +1 a 21 +1 a 21 +2 b 22 +2 b 22 +3 c 23 +3 c 23 +4 d \N + +-- !sql -- +1 a 21 +1 a 21 +2 b 22 +2 b 22 +3 c 23 +3 c 23 +4 d \N +4 d \N + diff --git a/regression-test/data/insert_p0/test_group_commit_10.csv b/regression-test/data/insert_p0/test_group_commit_10.csv new file mode 100644 index 00000000000..ef677f06936 --- /dev/null +++ b/regression-test/data/insert_p0/test_group_commit_10.csv @@ -0,0 +1,4 @@ +1,a,21 +2,b,22 +3,c,23 +4,d,a \ No newline at end of file diff --git a/regression-test/data/insert_p0/test_group_commit_11.csv.gz b/regression-test/data/insert_p0/test_group_commit_11.csv.gz new file mode 100644 index 00000000000..59c57ffdbf0 Binary files /dev/null and b/regression-test/data/insert_p0/test_group_commit_11.csv.gz differ diff --git a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out index abe3210dd81..57c2525815a 100644 --- a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out +++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out @@ -19,9 +19,6 @@ 6 f 60 7 e 70 8 f 80 -10 a 10 -11 a 11 -12 a \N -- !sql -- 2402288 diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy new file mode 100644 index 00000000000..02f3ca1e7a9 --- /dev/null +++ b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.mysql.cj.jdbc.StatementImpl + +suite("insert_group_commit_into_max_filter_ratio") { + def dbName = "regression_test_insert_p0" + def tableName = "insert_group_commit_into_max_filter_ratio" + def dbTableName = dbName + "." + tableName + + def get_row_count = { expectedRowCount -> + def rowCount = sql "select count(*) from ${dbTableName}" + logger.info("rowCount: " + rowCount + ", expecedRowCount: " + expectedRowCount) + assertEquals(expectedRowCount, rowCount[0][0]) + } + + def get_row_count_with_retry = { expectedRowCount -> + def retry = 0 + while (retry < 30) { + sleep(2000) + def rowCount = sql "select count(*) from ${dbTableName}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + def group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + } + + def off_mode_group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'VISIBLE'")) + assertFalse(serverInfo.contains("'label':'group_commit_")) + } + + def fail_group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + try { + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'ABORTED'")) + // assertFalse(serverInfo.contains("'label':'group_commit_")) + } catch (Exception e) { + logger.info("exception: " + e) + } + } + + def check_stream_load_result = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + def check_stream_load_result_with_exception = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + // assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + assertTrue(json.Message.contains("too many filtered rows")) + } + + def check_off_mode_stream_load_result = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertFalse(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + // create table + sql """ drop table if exists ${tableName}; """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL, + `type` varchar(1) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "1000" + ); + """ + + // insert + // legacy, nereids + // if enable strict mode + // 100 rows(success, fail), 10000 rows(success, fail), 15000 rows(success, fail) + // async mode, sync mode, off mode + for (item in ["legacy", "nereids"]) { + sql """ truncate table ${tableName} """ + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + // sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + sql """ set group_commit = sync_mode; """ + group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10); """, 1 + sql """ set group_commit = async_mode; """ + group_commit_insert """ insert into ${dbTableName}(id) select 2; """, 1 + sql """ set group_commit = off_mode; """ + off_mode_group_commit_insert """ insert into ${dbTableName} values (3, 'a', 10); """, 1 + sql """ set group_commit = async_mode; """ + fail_group_commit_insert """ insert into ${dbTableName} values (4, 'abc', 10); """, 0 + sql """ set enable_insert_strict = false; """ + group_commit_insert """ insert into ${dbTableName} values (5, 'abc', 10); """, 0 + + // The row 6 and 7 is different between legacy and nereids + try { + sql """ set group_commit = off_mode; """ + sql """ set enable_insert_strict = true; """ + sql """ insert into ${dbTableName} values (6, 'a', 'a'); """ + } catch (Exception e) { + logger.info("exception: " + e) + assertTrue(e.toString().contains("Invalid number format")) + } + + try { + sql """ set group_commit = off_mode; """ + sql """ set enable_insert_strict = false; """ + sql """ insert into ${dbTableName} values (7, 'a', 'a'); """ + } catch (Exception e) { + logger.info("exception: " + e) + assertTrue(e.toString().contains("Invalid number format")) + } + + // TODO should throw exception? + sql """ set group_commit = async_mode; """ + sql """ set enable_insert_strict = true; """ + fail_group_commit_insert """ insert into ${dbTableName} values (8, 'a', 'a'); """, 0 + + sql """ set group_commit = async_mode; """ + sql """ set enable_insert_strict = false; """ + group_commit_insert """ insert into ${dbTableName} values (9, 'a', 'a'); """, 0 + } + get_row_count_with_retry(4 + item == "nereids" ? 2 : 0) + order_qt_sql """ select * from ${dbTableName} """ + } + sql """ truncate table ${tableName} """ + + // 2. stream load(async or sync mode, strict mode, max_filter_ratio, 10000 rows) + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + file "test_group_commit_10.csv" + unset 'label' + set 'group_commit', 'async_mode' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result(exception, result, 4, 4, 0, 0) + } + } + get_row_count_with_retry(4) + + // sync_mode, strict_mode = true, max_filter_ratio = 0 + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + file "test_group_commit_10.csv" + unset 'label' + set 'group_commit', 'sync_mode' + set 'strict_mode', 'true' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result_with_exception(exception, result, 4, 3, 1, 0) + } + } + get_row_count(4) + + // sync_mode, strict_mode = true, max_filter_ratio = 0.3 + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + file "test_group_commit_10.csv" + unset 'label' + set 'group_commit', 'sync_mode' + set 'strict_mode', 'true' + set 'max_filter_ratio', '0.3' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result(exception, result, 4, 3, 1, 0) + } + } + get_row_count(7) + + order_qt_sql """ select * from ${tableName} """ + + // 10001 rows + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + file "test_group_commit_11.csv.gz" + unset 'label' + set 'compress_type', 'gz' + set 'group_commit', 'sync_mode' + set 'strict_mode', 'true' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result(exception, result, 10001, 10000, 1, 0) + } + } + get_row_count(10007) + sql """ truncate table ${tableName} """ + + // 3. http stream(async or sync mode, strict mode, max_filter_ratio, 10000 rows) + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${dbTableName} select * from http_stream + ("format"="csv", "column_separator"=",") + """ + set 'group_commit', 'sync_mode' + file "test_group_commit_10.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result(exception, result, 4, 4, 0, 0) + } + } + get_row_count_with_retry(4) + + // not use group commit + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${dbTableName} select * from http_stream + ("format"="csv", "column_separator"=",") + """ + file "test_group_commit_10.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_off_mode_stream_load_result(exception, result, 4, 4, 0, 0) + } + } + get_row_count(8) + + order_qt_sql """ select * from ${tableName} """ +} diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy index 144cc30c095..72d52229816 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy @@ -100,6 +100,7 @@ suite("insert_group_commit_with_prepare_stmt") { """ sql """ set group_commit = async_mode; """ + sql """ set enable_insert_strict = false; """ // 1. insert into def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?, ?, ?) """ @@ -159,6 +160,7 @@ suite("insert_group_commit_with_prepare_stmt") { """ sql """ set group_commit = async_mode; """ + sql """ set enable_insert_strict = false; """ // 1. insert into def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?, ?, ?) """ diff --git a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy index de2581e0237..6909a919c67 100644 --- a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy @@ -212,14 +212,22 @@ suite("test_group_commit_http_stream") { set 'group_commit', 'async_mode' file "test_stream_load3.csv" - set 'max_filter_ratio', '0.7' + // TODO max_filter_ratio is not supported http_stream + // set 'max_filter_ratio', '0.7' unset 'label' time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> // TODO different with stream load: 6, 2, 3, 1 - checkStreamLoadResult(exception, result, 6, 4, 2, 0) + // checkStreamLoadResult(exception, result, 5, 4, 1, 0) + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("too many filtered rows")) } } @@ -246,7 +254,7 @@ suite("test_group_commit_http_stream") { } } - getRowCount(22) + getRowCount(19) qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; " } finally { // try_sql("DROP TABLE ${tableName}") diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index 6ae9fb13b4d..b60b6dc5555 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") { time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 6, 2, 3, 1) + checkStreamLoadResult(exception, result, 6, 3, 2, 1) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org