This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 89845dc916d [fix] (move-memtable) fix errors caused by checks in the inverted index file writer (#37621) 89845dc916d is described below commit 89845dc916d6d8c29f8a06e6af226e0d3e6dc40b Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Thu Jul 11 19:43:57 2024 +0800 [fix] (move-memtable) fix errors caused by checks in the inverted index file writer (#37621) ## Proposed changes  1. thread A flush segment 0, flush inverted index 0 2. thread B flush segment 1 3. thread A add segment 0, dst be will check inverted index file size == segment file size, it will fail at this point. check the inverted index file size == the segment file size during `close()` --- be/src/runtime/load_stream_writer.cpp | 15 ++-- .../test_move_memtable_multi_segment_index.out | 4 + .../test_move_memtable_multi_segment_index.groovy | 100 +++++++++++++++++++++ .../suites/variant_github_events_p0/load.groovy | 1 + .../variant_github_events_p0_new/load.groovy | 1 + regression-test/suites/variant_p0/load.groovy | 1 + 6 files changed, 114 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 925229a43ce..3e66787a9bd 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -187,13 +187,6 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st if (!_is_init) { return Status::Corruption("add_segment failed, LoadStreamWriter is not inited"); } - if (_inverted_file_writers.size() > 0 && - _inverted_file_writers.size() != _segment_file_writers.size()) { - return Status::Corruption( - "add_segment failed, inverted file writer size is {}," - "segment file writer size is {}", - _inverted_file_writers.size(), _segment_file_writers.size()); - } DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid", { segid = _segment_file_writers.size(); }); RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, &segment_file_size)); @@ -255,7 +248,13 @@ Status LoadStreamWriter::close() { if (_is_canceled) { return Status::InternalError("flush segment failed"); } - + if (_inverted_file_writers.size() > 0 && + _inverted_file_writers.size() != _segment_file_writers.size()) { + return Status::Corruption( + "LoadStreamWriter close failed, inverted file writer size is {}," + "segment file writer size is {}", + _inverted_file_writers.size(), _segment_file_writers.size()); + } for (const auto& writer : _segment_file_writers) { if (writer->state() != io::FileWriter::State::CLOSED) { return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed", diff --git a/regression-test/data/load/insert/test_move_memtable_multi_segment_index.out b/regression-test/data/load/insert/test_move_memtable_multi_segment_index.out new file mode 100644 index 00000000000..c569d3acd09 --- /dev/null +++ b/regression-test/data/load/insert/test_move_memtable_multi_segment_index.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_select_count -- +67843 + diff --git a/regression-test/suites/load/insert/test_move_memtable_multi_segment_index.groovy b/regression-test/suites/load/insert/test_move_memtable_multi_segment_index.groovy new file mode 100644 index 00000000000..ac225812484 --- /dev/null +++ b/regression-test/suites/load/insert/test_move_memtable_multi_segment_index.groovy @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_move_memtable_multi_segment_index", "nonConcurrent"){ + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + try { + set_be_config("write_buffer_size", "2097152") + def table_name = "github_events" + sql """DROP TABLE IF EXISTS ${table_name}""" + table_name = "github_events" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant, + INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 1 + properties("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-2.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-3.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-16.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-10.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-22.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-23.json'}""") + + sql """DROP TABLE IF EXISTS github_events_2""" + sql """ + CREATE TABLE IF NOT EXISTS `github_events_2` ( + `k` BIGINT NULL, + `v` text NULL, + INDEX idx_var (`v`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) ENGINE = OLAP DUPLICATE KEY(`k`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`k`) BUCKETS 1 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + insert into github_events_2 select 1, cast(v["repo"]["name"] as string) FROM github_events; + """ + qt_sql_select_count """ select count(*) from github_events_2; """ + } finally { + set_be_config("write_buffer_size", "209715200") + } + +} diff --git a/regression-test/suites/variant_github_events_p0/load.groovy b/regression-test/suites/variant_github_events_p0/load.groovy index cec4fb69fd4..8159a5752e5 100644 --- a/regression-test/suites/variant_github_events_p0/load.groovy +++ b/regression-test/suites/variant_github_events_p0/load.groovy @@ -127,6 +127,7 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){ set 'read_json_by_line', 'true' set 'format', 'json' set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' file file_name // import json file time 10000 // limit inflight 10s diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy b/regression-test/suites/variant_github_events_p0_new/load.groovy index c063ebecf26..4d8d304aeb1 100644 --- a/regression-test/suites/variant_github_events_p0_new/load.groovy +++ b/regression-test/suites/variant_github_events_p0_new/load.groovy @@ -34,6 +34,7 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){ set 'read_json_by_line', 'true' set 'format', 'json' set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' file file_name // import json file time 10000 // limit inflight 10s diff --git a/regression-test/suites/variant_p0/load.groovy b/regression-test/suites/variant_p0/load.groovy index cbd6bc1178c..c84620a3d91 100644 --- a/regression-test/suites/variant_p0/load.groovy +++ b/regression-test/suites/variant_p0/load.groovy @@ -26,6 +26,7 @@ suite("regression_test_variant", "nonConcurrent"){ set 'read_json_by_line', 'true' set 'format', 'json' set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' file file_name // import json file time 10000 // limit inflight 10s --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org