This is an automated email from the ASF dual-hosted git repository. lide pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 7103872fc20 [branch-2.0](recover) support skipping missing version in select by session variable (#34931) 7103872fc20 is described below commit 7103872fc20ab67d47deeda2ab571d0e9a1bcc17 Author: xy720 <22125576+xy...@users.noreply.github.com> AuthorDate: Thu May 16 15:29:38 2024 +0800 [branch-2.0](recover) support skipping missing version in select by session variable (#34931) --- be/src/olap/schema_change.cpp | 2 +- be/src/olap/tablet.cpp | 23 +++++++----- be/src/olap/tablet.h | 8 +++-- be/src/runtime/runtime_state.h | 4 +++ be/src/vec/exec/scan/new_olap_scan_node.cpp | 2 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 3 +- be/test/olap/tablet_test.cpp | 4 +-- .../main/java/org/apache/doris/common/Config.java | 14 -------- .../java/org/apache/doris/catalog/Replica.java | 18 ++++++++++ .../main/java/org/apache/doris/catalog/Tablet.java | 4 +-- .../org/apache/doris/planner/OlapScanNode.java | 32 ++++++++++------- .../java/org/apache/doris/qe/SessionVariable.java | 17 +++++++++ gensrc/thrift/PaloInternalService.thrift | 2 ++ .../session_variable/test_skip_missing_version.out | 4 +++ .../test_skip_missing_version.groovy | 42 ++++++++++++++++++++++ 15 files changed, 135 insertions(+), 44 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 4ea1ccbc3ff..80dc6003619 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1004,7 +1004,7 @@ Status SchemaChangeHandler::_get_versions_to_be_changed( *max_rowset = rowset; RETURN_IF_ERROR(base_tablet->capture_consistent_versions(Version(0, rowset->version().second), - versions_to_be_changed)); + versions_to_be_changed, false, false)); return Status::OK(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 51811c2d22d..0d1898a5999 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -758,7 +758,7 @@ void Tablet::delete_expired_stale_rowset() { Version test_version = Version(0, lastest_delta->end_version()); stale_version_path_map[*path_id_iter] = version_path; - Status status = capture_consistent_versions(test_version, nullptr); + Status status = capture_consistent_versions(test_version, nullptr, false, false); // 1. When there is no consistent versions, we must reconstruct the tracker. if (!status.ok()) { // 2. fetch missing version after delete @@ -882,7 +882,8 @@ bool Tablet::_reconstruct_version_tracker_if_necessary() { } Status Tablet::capture_consistent_versions(const Version& spec_version, - std::vector<Version>* version_path, bool quiet) const { + std::vector<Version>* version_path, + bool skip_missing_version, bool quiet) const { Status status = _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); if (!status.ok() && !quiet) { @@ -905,6 +906,10 @@ Status Tablet::capture_consistent_versions(const Version& spec_version, LOG(WARNING) << "status:" << status << ", tablet:" << full_name() << ", missed version for version:" << spec_version; _print_missed_versions(missed_versions); + if (skip_missing_version) { + LOG(WARNING) << "force skipping missing version for tablet:" << full_name(); + return Status::OK(); + } } } } @@ -913,7 +918,7 @@ Status Tablet::capture_consistent_versions(const Version& spec_version, Status Tablet::check_version_integrity(const Version& version, bool quiet) { std::shared_lock rdlock(_meta_lock); - return capture_consistent_versions(version, nullptr, quiet); + return capture_consistent_versions(version, nullptr, false, quiet); } bool Tablet::exceed_version_limit(int32_t limit) const { @@ -946,7 +951,7 @@ void Tablet::acquire_version_and_rowsets( Status Tablet::capture_consistent_rowsets(const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const { std::vector<Version> version_path; - RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path)); + RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path, false, false)); RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path, rowsets)); return Status::OK(); } @@ -982,10 +987,11 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const std::vector<Version>& return Status::OK(); } -Status Tablet::capture_rs_readers(const Version& spec_version, - std::vector<RowSetSplits>* rs_splits) const { +Status Tablet::capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits, + bool skip_missing_version) const { std::vector<Version> version_path; - RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path)); + RETURN_IF_ERROR( + capture_consistent_versions(spec_version, &version_path, skip_missing_version, false)); RETURN_IF_ERROR(capture_rs_readers(version_path, rs_splits)); return Status::OK(); } @@ -3722,7 +3728,8 @@ Status Tablet::check_rowid_conversion( Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { // Ensure that the obtained versions of rowsets are continuous std::vector<Version> version_path; - RETURN_IF_ERROR(capture_consistent_versions(Version(0, max_version), &version_path)); + RETURN_IF_ERROR( + capture_consistent_versions(Version(0, max_version), &version_path, false, false)); for (auto& ver : version_path) { if (ver.second == 1) { // [0-1] rowset is empty for each tablet, skip it diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 775bfa9262b..45694e30602 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -176,9 +176,10 @@ public: // Given spec_version, find a continuous version path and store it in version_path. // If quiet is true, then only "does this path exist" is returned. + // If skip_missing_version is true, return ok even there are missing versions. Status capture_consistent_versions(const Version& spec_version, std::vector<Version>* version_path, - bool quiet = false) const; + bool skip_missing_version, bool quiet = false) const; // if quiet is true, no error log will be printed if there are missing versions Status check_version_integrity(const Version& version, bool quiet = false); bool check_version_exist(const Version& version) const; @@ -187,8 +188,9 @@ public: Status capture_consistent_rowsets(const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const; - Status capture_rs_readers(const Version& spec_version, - std::vector<RowSetSplits>* rs_splits) const; + // If skip_missing_version is true, skip versions if they are missing. + Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits, + bool skip_missing_version) const; Status capture_rs_readers(const std::vector<Version>& version_path, std::vector<RowSetSplits>* rs_splits) const; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 8e57638dee6..246d5a54783 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -361,6 +361,10 @@ public: return _query_options.__isset.skip_delete_bitmap && _query_options.skip_delete_bitmap; } + bool skip_missing_version() const { + return _query_options.__isset.skip_missing_version && _query_options.skip_missing_version; + } + bool enable_page_cache() const; int partitioned_hash_join_rows_threshold() const { diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index e442b4b4e4f..4dc56c3f44c 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -479,7 +479,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { auto& read_source = tablets_read_source.emplace_back(); { std::shared_lock rdlock(tablet->get_header_lock()); - auto st = tablet->capture_rs_readers({0, version}, &read_source.rs_splits); + auto st = tablet->capture_rs_readers({0, version}, &read_source.rs_splits, false); if (!st.ok()) { LOG(WARNING) << "fail to init reader.res=" << st; return Status::InternalError( diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index f7e839a89b9..6d9a7964dff 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -186,7 +186,8 @@ Status NewOlapScanner::init() { ReadSource read_source; { std::shared_lock rdlock(_tablet->get_header_lock()); - auto st = _tablet->capture_rs_readers(rd_version, &read_source.rs_splits); + auto st = _tablet->capture_rs_readers(rd_version, &read_source.rs_splits, + _state->skip_missing_version()); if (!st.ok()) { LOG(WARNING) << "fail to init reader.res=" << st; return st; diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp index 951d98b1e05..5889846a49e 100644 --- a/be/test/olap/tablet_test.cpp +++ b/be/test/olap/tablet_test.cpp @@ -293,12 +293,12 @@ TEST_F(TestTablet, pad_rowset) { Version version(5, 5); std::vector<RowSetSplits> splits; - ASSERT_FALSE(_tablet->capture_rs_readers(version, &splits).ok()); + ASSERT_FALSE(_tablet->capture_rs_readers(version, &splits, false).ok()); splits.clear(); PadRowsetAction action(nullptr, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN); action._pad_rowset(_tablet, version); - ASSERT_TRUE(_tablet->capture_rs_readers(version, &splits).ok()); + ASSERT_TRUE(_tablet->capture_rs_readers(version, &splits, false).ok()); } TEST_F(TestTablet, cooldown_policy) { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c1c03754c99..47561c4dd22 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1286,20 +1286,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean recover_with_empty_tablet = false; - /** - * In some scenarios, there is an unrecoverable metadata problem in the cluster, - * and the visibleVersion of the data does not match be. In this case, it is still - * necessary to restore the remaining data (which may cause problems with the correctness of the data). - * This configuration is the same as` recover_with_empty_tablet` should only be used in emergency situations - * This configuration has three values: - * disable : If an exception occurs, an error will be reported normally. - * ignore_version: ignore the visibleVersion information recorded in fe partition, use replica version - * ignore_all: In addition to ignore_version, when encountering no queryable replica, - * skip it directly instead of throwing an exception - */ - @ConfField(mutable = true, masterOnly = true) - public static String recover_with_skip_missing_version = "disable"; - /** * Whether to add a delete sign column when create unique table */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index fc4208add56..0c5bf7a54b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -37,6 +37,8 @@ import java.util.Comparator; public class Replica implements Writable { private static final Logger LOG = LogManager.getLogger(Replica.class); public static final VersionComparator<Replica> VERSION_DESC_COMPARATOR = new VersionComparator<Replica>(); + public static final LastSuccessVersionComparator<Replica> LAST_SUCCESS_VERSION_COMPARATOR = + new LastSuccessVersionComparator<Replica>(); public static final IdComparator<Replica> ID_COMPARATOR = new IdComparator<Replica>(); public enum ReplicaState { @@ -662,6 +664,22 @@ public class Replica implements Writable { } } + private static class LastSuccessVersionComparator<T extends Replica> implements Comparator<T> { + public LastSuccessVersionComparator() { + } + + @Override + public int compare(T replica1, T replica2) { + if (replica1.getLastSuccessVersion() < replica2.getLastSuccessVersion()) { + return 1; + } else if (replica1.getLastSuccessVersion() == replica2.getLastSuccessVersion()) { + return 0; + } else { + return -1; + } + } + } + private static class IdComparator<T extends Replica> implements Comparator<T> { public IdComparator() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 7d65b6b95a2..19d159db40b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -238,7 +238,7 @@ public class Tablet extends MetaObject implements Writable { } // for query - public List<Replica> getQueryableReplicas(long visibleVersion) { + public List<Replica> getQueryableReplicas(long visibleVersion, boolean allowFailedVersion) { List<Replica> allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size()); List<Replica> auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size()); for (Replica replica : replicas) { @@ -247,7 +247,7 @@ public class Tablet extends MetaObject implements Writable { } // Skip the missing version replica - if (replica.getLastFailedVersion() > 0) { + if (replica.getLastFailedVersion() > 0 && !allowFailedVersion) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index f7263980136..edc440e8866 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -726,10 +726,15 @@ public class OlapScanNode extends ScanNode { String visibleVersionStr = String.valueOf(visibleVersion); Set<Tag> allowedTags = Sets.newHashSet(); + int useFixReplica = -1; boolean needCheckTags = false; + boolean skipMissingVersion = false; if (ConnectContext.get() != null) { allowedTags = ConnectContext.get().getResourceTags(); needCheckTags = ConnectContext.get().isResourceTagsSet(); + useFixReplica = ConnectContext.get().getSessionVariable().useFixReplica; + // if use_fix_replica is set to true, set skip_missing_version to false + skipMissingVersion = useFixReplica == -1 && ConnectContext.get().getSessionVariable().skipMissingVersion; if (LOG.isDebugEnabled()) { LOG.debug("query id: {}, partition id:{} visibleVersion: {}", DebugUtil.printId(ConnectContext.get().queryId()), partition.getId(), visibleVersion); @@ -737,7 +742,7 @@ public class OlapScanNode extends ScanNode { } for (Tablet tablet : tablets) { long tabletId = tablet.getId(); - if (!Config.recover_with_skip_missing_version.equalsIgnoreCase("disable")) { + if (skipMissingVersion) { long tabletVersion = -1L; for (Replica replica : tablet.getReplicas()) { if (replica.getVersion() > tabletVersion) { @@ -760,7 +765,7 @@ public class OlapScanNode extends ScanNode { paloRange.setTabletId(tabletId); // random shuffle List && only collect one copy - List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion); + List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion); if (replicas.isEmpty()) { LOG.warn("no queryable replica found in tablet {}. visible version {}", tabletId, visibleVersion); StringBuilder sb = new StringBuilder( @@ -774,12 +779,14 @@ public class OlapScanNode extends ScanNode { throw new UserException(sb.toString()); } - int useFixReplica = -1; - if (ConnectContext.get() != null) { - useFixReplica = ConnectContext.get().getSessionVariable().useFixReplica; - } if (useFixReplica == -1) { Collections.shuffle(replicas); + if (skipMissingVersion) { + // sort by replica's last success version, higher success version in the front. + replicas.sort(Replica.LAST_SUCCESS_VERSION_COMPARATOR); + } else { + Collections.shuffle(replicas); + } } else { LOG.debug("use fix replica, value: {}, replica num: {}", useFixReplica, replicas.size()); // sort by replica id @@ -849,14 +856,15 @@ public class OlapScanNode extends ScanNode { collectedStat = true; } scanBackendIds.add(backend.getId()); + // For skipping missing version of tablet, we only select the backend with the highest last + // success version replica to save as much data as possible. + if (skipMissingVersion) { + break; + } } if (tabletIsNull) { - if (Config.recover_with_skip_missing_version.equalsIgnoreCase("ignore_all")) { - continue; - } else { - throw new UserException(tabletId + " have no queryable replicas. err: " - + Joiner.on(", ").join(errs)); - } + throw new UserException(tabletId + " have no queryable replicas. err: " + + Joiner.on(", ").join(errs)); } TScanRange scanRange = new TScanRange(); scanRange.setPaloScanRange(paloRange); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 56743ba04c5..27db5f2a0c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -294,6 +294,8 @@ public class SessionVariable implements Serializable, Writable { public static final String SKIP_DELETE_BITMAP = "skip_delete_bitmap"; + public static final String SKIP_MISSING_VERSION = "skip_missing_version"; + public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD = "enable_new_shuffle_hash_method"; public static final String ENABLE_PUSH_DOWN_NO_GROUP_AGG = "enable_push_down_no_group_agg"; @@ -988,6 +990,19 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = SKIP_DELETE_BITMAP) public boolean skipDeleteBitmap = false; + // This variable replace the original FE config `recover_with_skip_missing_version`. + // In some scenarios, all replicas of tablet are having missing versions, and the tablet is unable to recover. + // This config can control the behavior of query. When it is set to `true`, the query will ignore the + // visible version recorded in FE partition, use the replica version. If the replica on BE has missing versions, + // the query will directly skip this missing version, and only return the data of the existing versions. + // Besides, the query will always try to select the one with the highest lastSuccessVersion among all surviving + // BE replicas, so as to recover as much data as possible. + // You should only open it in the emergency scenarios mentioned above, only used for temporary recovery queries. + // This variable conflicts with the use_fix_replica variable, when the use_fix_replica variable is not -1, + // this variable will not work. + @VariableMgr.VarAttr(name = SKIP_MISSING_VERSION) + public boolean skipMissingVersion = false; + // This variable is used to avoid FE fallback to the original parser. When we execute SQL in regression tests // for nereids, fallback will cause the Doris return the correct result although the syntax is unsupported // in nereids for some mistaken modification. You should set it on the @@ -2590,6 +2605,8 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableInvertedIndexCompoundInlist(enableInvertedIndexCompoundInlist); + tResult.setSkipMissingVersion(skipMissingVersion); + return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 22422aeabac..37449a4d638 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -255,6 +255,8 @@ struct TQueryOptions { 89: optional i32 inverted_index_skip_threshold = 50; 90: optional bool enable_inverted_index_compound_inlist = false; + // For emergency use, skip missing version when reading rowsets + 91: optional bool skip_missing_version = false; } diff --git a/regression-test/data/query_p0/session_variable/test_skip_missing_version.out b/regression-test/data/query_p0/session_variable/test_skip_missing_version.out new file mode 100644 index 00000000000..2905460928e --- /dev/null +++ b/regression-test/data/query_p0/session_variable/test_skip_missing_version.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_all -- +1000 a 10 +2000 b 10 diff --git a/regression-test/suites/query_p0/session_variable/test_skip_missing_version.groovy b/regression-test/suites/query_p0/session_variable/test_skip_missing_version.groovy new file mode 100644 index 00000000000..859fa3ca680 --- /dev/null +++ b/regression-test/suites/query_p0/session_variable/test_skip_missing_version.groovy @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_skip_missing_version") { + def test_tbl = "test_skip_missing_version_tbl" + + sql """ DROP TABLE IF EXISTS ${test_tbl}""" + sql """ + CREATE TABLE ${test_tbl} ( + `k1` int(11) NULL, + `k2` char(5) NULL, + `k3` tinyint(4) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`, `k3`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 + PROPERTIES ( + "replication_num"="1" + ); + """ + + sql """ INSERT INTO ${test_tbl} VALUES(1000, 'a', 10); """ + sql """ INSERT INTO ${test_tbl} VALUES(2000, 'b', 10); """ + + // This case cannot verify the results, but it can verify abnormalities after + // SET skip_missing_version=true sql """ SET skip_missing_version=true """ + sql """ SET skip_missing_version=true """ + qt_select_all """ select * from ${test_tbl} order by k1 """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org