This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ee37bd66691 [improve](cloud) reduce fe call get_version to
meta_service (#60467)
ee37bd66691 is described below
commit ee37bd66691c5bdea19687fa4b25f4e3a2b7bf1d
Author: meiyi <[email protected]>
AuthorDate: Sat Feb 28 15:16:40 2026 +0800
[improve](cloud) reduce fe call get_version to meta_service (#60467)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
cloud/src/meta-service/meta_service.h | 3 +-
cloud/src/meta-service/meta_service_partition.cpp | 141 ++++++++++++-
cloud/src/meta-service/meta_service_txn.cpp | 184 +++++++++++++++--
cloud/test/meta_service_test.cpp | 37 +++-
cloud/test/meta_service_versioned_read_test.cpp | 39 ++++
cloud/test/txn_lazy_commit_test.cpp | 38 ++++
.../main/java/org/apache/doris/common/Config.java | 19 ++
.../java/org/apache/doris/catalog/OlapTable.java | 76 ++++++-
.../doris/clone/DynamicPartitionScheduler.java | 2 +-
.../org/apache/doris/cloud/catalog/CloudEnv.java | 11 +-
.../cloud/catalog/CloudFEVersionSynchronizer.java | 220 +++++++++++++++++++++
.../apache/doris/cloud/catalog/CloudPartition.java | 52 ++++-
.../cloud/catalog/CloudSyncVersionDaemon.java | 203 +++++++++++++++++++
.../cloud/datasource/CloudInternalCatalog.java | 52 ++++-
.../transaction/CloudGlobalTransactionMgr.java | 122 ++++++++----
.../java/org/apache/doris/common/ClientPool.java | 3 +
.../doris/common/proc/PartitionsProcDir.java | 74 ++++++-
.../apache/doris/datasource/InternalCatalog.java | 13 +-
.../java/org/apache/doris/qe/SessionVariable.java | 7 +-
.../apache/doris/service/FrontendServiceImpl.java | 12 ++
.../org/apache/doris/catalog/OlapTableTest.java | 132 +++++++++++++
gensrc/proto/cloud.proto | 5 +-
gensrc/thrift/FrontendService.thrift | 15 ++
23 files changed, 1369 insertions(+), 91 deletions(-)
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index 2e4e2790f4b..487b26f3899 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -505,7 +505,8 @@ private:
std::string_view instance_id, KVStats& stats);
void commit_partition_internal(const PartitionRequest* request, const
std::string& instance_id,
- const std::vector<int64_t>& partition_ids,
MetaServiceCode& code,
+ const std::vector<int64_t>& partition_ids,
+ PartitionResponse* response,
MetaServiceCode& code,
std::string& msg, KVStats& stats);
// Wait for all pending transactions before returning, and bump up the
version to the latest.
diff --git a/cloud/src/meta-service/meta_service_partition.cpp
b/cloud/src/meta-service/meta_service_partition.cpp
index 811fc219b99..c28abeadb0f 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -211,6 +211,9 @@ void
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
msg = "db_id is required for versioned write, please upgrade your FE
version";
return;
}
+ if (request->has_is_new_table() && request->is_new_table() &&
is_versioned_write) {
+ txn->enable_get_versionstamp();
+ }
CloneChainReader reader(instance_id, resource_mgr_.get());
for (auto index_id : request->index_ids()) {
@@ -327,6 +330,29 @@ void
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
msg = fmt::format("failed to get table version, err={}", err);
return;
}
+ } else {
+ // set table version in response
+ int64_t table_id = request->table_id();
+ std::string ver_key = table_version_key({instance_id,
request->db_id(), table_id});
+ std::string ver_val;
+ err = txn->get(ver_key, &ver_val);
+ int64_t table_version = 0;
+ if (err == TxnErrorCode::TXN_OK) {
+ if (!txn->decode_atomic_int(ver_val, &table_version)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed table version value, err=" << err
+ << " table_id=" << request->table_id();
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get table version, err=" << err << "
table_id=" << table_id;
+ msg = ss.str();
+ return;
+ }
+ response->set_table_version(table_version + 1);
}
// init table version, for create and truncate table
update_table_version(txn.get(), instance_id, request->db_id(),
request->table_id());
@@ -353,6 +379,22 @@ void
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
msg = fmt::format("failed to commit txn: {}", err);
return;
}
+
+ // set table version in response
+ if (request->has_is_new_table() && request->is_new_table() &&
is_versioned_read) {
+ Versionstamp vs;
+ err = txn->get_versionstamp(&vs);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get kv txn versionstamp, table_id=" <<
request->table_id()
+ << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ int64_t version = vs.version();
+ response->set_table_version(version);
+ }
}
void MetaServiceImpl::drop_index(::google::protobuf::RpcController* controller,
@@ -655,7 +697,7 @@ void
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
for (size_t j = i; j < end; j++) {
partition_ids.push_back(request->partition_ids(j));
}
- commit_partition_internal(request, instance_id, partition_ids, code,
msg, stats);
+ commit_partition_internal(request, instance_id, partition_ids,
response, code, msg, stats);
if (code != MetaServiceCode::OK) {
return;
}
@@ -665,8 +707,8 @@ void
MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
void MetaServiceImpl::commit_partition_internal(const PartitionRequest*
request,
const std::string& instance_id,
const std::vector<int64_t>&
partition_ids,
- MetaServiceCode& code,
std::string& msg,
- KVStats& stats) {
+ PartitionResponse* response,
MetaServiceCode& code,
+ std::string& msg, KVStats&
stats) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
@@ -699,6 +741,10 @@ void MetaServiceImpl::commit_partition_internal(const
PartitionRequest* request,
return;
}
+ if (is_versioned_write) {
+ txn->enable_get_versionstamp();
+ }
+
CloneChainReader reader(instance_id, resource_mgr_.get());
size_t num_commit = 0;
for (auto part_id : partition_ids) {
@@ -794,6 +840,31 @@ void MetaServiceImpl::commit_partition_internal(const
PartitionRequest* request,
msg = fmt::format("failed to get table version, err={}", err);
return;
}
+ } else {
+ // set table version in response
+ std::string ver_key =
+ table_version_key({instance_id, request->db_id(),
request->table_id()});
+ std::string ver_val;
+ err = txn->get(ver_key, &ver_val);
+ int64_t table_version = 0;
+ if (err == TxnErrorCode::TXN_OK) {
+ if (!txn->decode_atomic_int(ver_val, &table_version)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ std::stringstream ss;
+ ss << "malformed table version value, err=" << err
+ << " table_id=" << request->table_id();
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ std::stringstream ss;
+ ss << "failed to get table version, err=" << err << " table_id="
<< request->table_id();
+ msg = ss.str();
+ return;
+ }
+ response->set_table_version(table_version + 1);
}
update_table_version(txn.get(), instance_id, request->db_id(),
request->table_id());
@@ -818,6 +889,23 @@ void MetaServiceImpl::commit_partition_internal(const
PartitionRequest* request,
msg = fmt::format("failed to commit txn: {}", err);
return;
}
+
+ // set table version in response
+ if (is_versioned_read) {
+ Versionstamp vs;
+ err = txn->get_versionstamp(&vs);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ std::stringstream ss;
+ ss << "failed to get kv txn versionstamp, table_id=" <<
request->table_id()
+ << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ int64_t version = vs.version();
+ response->set_table_version(version);
+ }
}
void MetaServiceImpl::drop_partition(::google::protobuf::RpcController*
controller,
@@ -870,6 +958,9 @@ void
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
drop_partition_log.set_table_id(request->table_id());
drop_partition_log.mutable_index_ids()->CopyFrom(request->index_ids());
drop_partition_log.set_expired_at_s(request->expiration());
+ if (is_versioned_write) {
+ txn->enable_get_versionstamp();
+ }
CloneChainReader reader(instance_id, resource_mgr_.get());
for (auto part_id : request->partition_ids()) {
@@ -938,6 +1029,32 @@ void
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
msg = fmt::format("failed to get table version, err={}", err);
return;
}
+ } else {
+ // set table version in response
+ std::string ver_key =
+ table_version_key({instance_id, request->db_id(),
request->table_id()});
+ std::string ver_val;
+ err = txn->get(ver_key, &ver_val);
+ int64_t table_version = 0;
+ if (err == TxnErrorCode::TXN_OK) {
+ if (!txn->decode_atomic_int(ver_val, &table_version)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ std::stringstream ss;
+ ss << "malformed table version value, err=" << err
+ << " table_id=" << request->table_id();
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ std::stringstream ss;
+ ss << "failed to get table version, err=" << err
+ << " table_id=" << request->table_id();
+ msg = ss.str();
+ return;
+ }
+ response->set_table_version(table_version + 1);
}
update_table_version(txn.get(), instance_id, request->db_id(),
request->table_id());
drop_partition_log.set_update_table_version(true);
@@ -964,6 +1081,24 @@ void
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
msg = fmt::format("failed to commit txn: {}", err);
return;
}
+
+ // set table version in response
+ if (request->has_need_update_table_version() &&
request->need_update_table_version() &&
+ is_versioned_read) {
+ Versionstamp vs;
+ err = txn->get_versionstamp(&vs);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ std::stringstream ss;
+ ss << "failed to get kv txn versionstamp, table_id=" <<
request->table_id()
+ << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ int64_t version = vs.version();
+ response->set_table_version(version);
+ }
}
void check_create_table(std::string instance_id, std::shared_ptr<TxnKv> txn_kv,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index cd7b62e7e6d..88fd0b81053 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1539,6 +1539,9 @@ void MetaServiceImpl::commit_txn_immediately(
LOG(WARNING) << msg;
return;
}
+ if (is_versioned_write) {
+ txn->enable_get_versionstamp();
+ }
DORIS_CLOUD_DEFER {
if (txn == nullptr) return;
stats.get_bytes += txn->get_bytes();
@@ -1827,6 +1830,8 @@ void MetaServiceImpl::commit_txn_immediately(
response->add_versions(new_version);
}
+ // table_id -> version, for response
+ std::map<int64_t, int64_t> table_version_map;
// Save table versions
for (auto& i : table_id_tablet_ids) {
if (is_versioned_read) {
@@ -1839,6 +1844,29 @@ void MetaServiceImpl::commit_txn_immediately(
LOG(WARNING) << msg;
return;
}
+ } else {
+ // set table versions in response
+ int64_t table_id = i.first;
+ std::string ver_key = table_version_key({instance_id, db_id,
table_id});
+ std::string ver_val;
+ err = txn->get(ver_key, &ver_val);
+ int64_t table_version = 0;
+ if (err == TxnErrorCode::TXN_OK) {
+ if (!txn->decode_atomic_int(ver_val, &table_version)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed table version value, err=" << err
+ << " table_id=" << i.first;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get table version, err=" << err << "
table_id=" << table_id;
+ msg = ss.str();
+ return;
+ }
+ table_version_map[table_id] = table_version + 1;
}
update_table_version(txn.get(), instance_id, db_id, i.first);
commit_txn_log.add_table_ids(i.first);
@@ -2004,20 +2032,40 @@ void MetaServiceImpl::commit_txn_immediately(
return;
}
+ // set table versions in response
+ if (is_versioned_read) {
+ Versionstamp vs;
+ err = txn->get_versionstamp(&vs);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get kv txn versionstamp, txn_id=" << txn_id
<< " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ int64_t version = vs.version();
+ for (auto& i : table_id_tablet_ids) {
+ int64_t table_id = i.first;
+ table_version_map[table_id] = version;
+ }
+ }
+
// calculate table stats from tablets stats
std::map<int64_t /*table_id*/, TableStats> table_stats;
std::vector<int64_t>
base_tablet_ids(request->base_tablet_ids().begin(),
request->base_tablet_ids().end());
calc_table_stats(tablet_ids, tablet_stats, table_stats,
base_tablet_ids);
- for (const auto& pair : table_stats) {
+ for (const auto& pair : table_version_map) {
TableStatsPB* stats_pb = response->add_table_stats();
auto table_id = pair.first;
- auto stats = pair.second;
- get_pb_from_tablestats(stats, stats_pb);
stats_pb->set_table_id(table_id);
- VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" <<
txn_id
- << " table_id=" << table_id
- << " updated_row_count=" <<
stats_pb->updated_row_count();
+ stats_pb->set_table_version(pair.second);
+ if (auto it = table_stats.find(table_id); it != table_stats.end())
{
+ get_pb_from_tablestats(it->second, stats_pb);
+ VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id="
<< txn_id
+ << " table_id=" << table_id
+ << " updated_row_count=" <<
stats_pb->updated_row_count();
+ }
}
response->mutable_txn_info()->CopyFrom(txn_info);
TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::finish", &code);
@@ -2236,6 +2284,9 @@ void MetaServiceImpl::commit_txn_eventually(
LOG(WARNING) << msg;
return;
}
+ if (is_versioned_write) {
+ txn->enable_get_versionstamp();
+ }
CommitTxnLogPB commit_txn_log;
commit_txn_log.set_txn_id(txn_id);
@@ -2482,6 +2533,8 @@ void MetaServiceImpl::commit_txn_eventually(
}
}
+ // table_id -> version, for response
+ std::map<int64_t, int64_t> table_version_map;
// Save table versions
for (auto& i : table_id_tablet_ids) {
if (is_versioned_read) {
@@ -2494,6 +2547,29 @@ void MetaServiceImpl::commit_txn_eventually(
LOG(WARNING) << msg;
return;
}
+ } else {
+ // set table versions in response
+ int64_t table_id = i.first;
+ std::string ver_key = table_version_key({instance_id, db_id,
table_id});
+ std::string ver_val;
+ err = txn->get(ver_key, &ver_val);
+ int64_t table_version = 0;
+ if (err == TxnErrorCode::TXN_OK) {
+ if (!txn->decode_atomic_int(ver_val, &table_version)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed table version value, err=" << err
+ << " table_id=" << i.first;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get table version, err=" << err << "
table_id=" << table_id;
+ msg = ss.str();
+ return;
+ }
+ table_version_map[table_id] = table_version + 1;
}
update_table_version(txn.get(), instance_id, db_id, i.first);
commit_txn_log.add_table_ids(i.first);
@@ -2532,6 +2608,24 @@ void MetaServiceImpl::commit_txn_eventually(
return;
}
+ // set table versions in response
+ if (is_versioned_read) {
+ Versionstamp vs;
+ err = txn->get_versionstamp(&vs);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get kv txn versionstamp, txn_id=" << txn_id
<< " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ int64_t version = vs.version();
+ for (auto& i : table_id_tablet_ids) {
+ int64_t table_id = i.first;
+ table_version_map[table_id] = version;
+ }
+ }
+
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::abort_txn_after_mark_txn_commited");
TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_eventually::txn_lazy_committer_submit",
@@ -2562,15 +2656,17 @@ void MetaServiceImpl::commit_txn_eventually(
std::vector<int64_t>
base_tablet_ids(request->base_tablet_ids().begin(),
request->base_tablet_ids().end());
calc_table_stats(tablet_ids, tablet_stats, table_stats,
base_tablet_ids);
- for (const auto& pair : table_stats) {
+ for (const auto& pair : table_version_map) {
TableStatsPB* stats_pb = response->add_table_stats();
auto table_id = pair.first;
- auto stats = pair.second;
- get_pb_from_tablestats(stats, stats_pb);
stats_pb->set_table_id(table_id);
- VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" <<
txn_id
- << " table_id=" << table_id
- << " updated_row_count=" <<
stats_pb->updated_row_count();
+ stats_pb->set_table_version(pair.second);
+ if (auto it = table_stats.find(table_id); it != table_stats.end())
{
+ get_pb_from_tablestats(it->second, stats_pb);
+ VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id="
<< txn_id
+ << " table_id=" << table_id
+ << " updated_row_count=" <<
stats_pb->updated_row_count();
+ }
}
// txn set visible for fe callback
@@ -2629,6 +2725,8 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
}
sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id,
std::move(tmp_rowsets_meta));
}
+ bool is_versioned_write = is_version_write_enabled(instance_id);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
do {
TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn:begin", &txn_id);
// Create a readonly txn for scan tmp rowset
@@ -2641,6 +2739,9 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
LOG(WARNING) << msg;
return;
}
+ if (is_versioned_write) {
+ txn->enable_get_versionstamp();
+ }
DORIS_CLOUD_DEFER {
if (txn == nullptr) return;
stats.get_bytes += txn->get_bytes();
@@ -2701,8 +2802,6 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
AnnotateTag txn_tag("txn_id", txn_id);
- bool is_versioned_write = is_version_write_enabled(instance_id);
- bool is_versioned_read = is_version_read_enabled(instance_id);
CloneChainReader meta_reader(instance_id, resource_mgr_.get());
// Prepare rowset meta and new_versions
@@ -2920,6 +3019,8 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
response->add_versions(new_version);
}
+ // table_id -> version, for response
+ std::map<int64_t, int64_t> table_version_map;
// Save table versions
for (auto& i : table_id_tablet_ids) {
if (is_versioned_read) {
@@ -2932,6 +3033,29 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
LOG(WARNING) << msg;
return;
}
+ } else {
+ // set table versions in response
+ int64_t table_id = i.first;
+ std::string ver_key = table_version_key({instance_id, db_id,
table_id});
+ std::string ver_val;
+ err = txn->get(ver_key, &ver_val);
+ int64_t table_version = 0;
+ if (err == TxnErrorCode::TXN_OK) {
+ if (!txn->decode_atomic_int(ver_val, &table_version)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed table version value, err=" << err
+ << " table_id=" << i.first;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get table version, err=" << err << "
table_id=" << table_id;
+ msg = ss.str();
+ return;
+ }
+ table_version_map[table_id] = table_version + 1;
}
update_table_version(txn.get(), instance_id, db_id, i.first);
commit_txn_log.add_table_ids(i.first);
@@ -3082,20 +3206,40 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
return;
}
+ // set table versions in response
+ if (is_versioned_read) {
+ Versionstamp vs;
+ err = txn->get_versionstamp(&vs);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get kv txn versionstamp, txn_id=" << txn_id
<< " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ int64_t version = vs.version();
+ for (auto& i : table_id_tablet_ids) {
+ int64_t table_id = i.first;
+ table_version_map[table_id] = version;
+ }
+ }
+
// calculate table stats from tablets stats
std::map<int64_t /*table_id*/, TableStats> table_stats;
std::vector<int64_t>
base_tablet_ids(request->base_tablet_ids().begin(),
request->base_tablet_ids().end());
calc_table_stats(tablet_ids, tablet_stats, table_stats,
base_tablet_ids);
- for (const auto& pair : table_stats) {
+ for (const auto& pair : table_version_map) {
TableStatsPB* stats_pb = response->add_table_stats();
auto table_id = pair.first;
- auto stats = pair.second;
- get_pb_from_tablestats(stats, stats_pb);
stats_pb->set_table_id(table_id);
- VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" <<
txn_id
- << " table_id=" << table_id
- << " updated_row_count=" <<
stats_pb->updated_row_count();
+ stats_pb->set_table_version(table_version_map[table_id]);
+ if (auto it = table_stats.find(table_id); it != table_stats.end())
{
+ get_pb_from_tablestats(it->second, stats_pb);
+ VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id="
<< txn_id
+ << " table_id=" << table_id
+ << " updated_row_count=" <<
stats_pb->updated_row_count();
+ }
}
response->mutable_txn_info()->CopyFrom(txn_info);
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index f815c0d3244..2b057a0b258 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -217,6 +217,20 @@ static void commit_txn(MetaServiceProxy* meta_service,
int64_t db_id, int64_t tx
<< label << ", res=" << res.ShortDebugString();
}
+static void get_table_version(MetaServiceProxy* meta_service, int64_t db_id,
int64_t table_id,
+ int64_t& version) {
+ brpc::Controller ctrl;
+ GetVersionRequest req;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.set_is_table_version(true);
+ GetVersionResponse resp;
+ meta_service->get_version(&ctrl, &req, &resp, nullptr);
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
+ << ", get table version res=" << resp.ShortDebugString();
+ version = resp.version();
+}
+
doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int
partition_id = 10,
int64_t version = -1, int num_rows =
100) {
doris::RowsetMetaCloudPB rowset;
@@ -1797,7 +1811,7 @@ TEST(MetaServiceTest, CommitTxnTest) {
int64_t partition_id = 1236;
// case: first version of rowset
- {
+ for (int i = 0; i < 2; ++i) {
int64_t txn_id = -1;
// begin txn
{
@@ -1806,7 +1820,7 @@ TEST(MetaServiceTest, CommitTxnTest) {
req.set_cloud_unique_id("test_cloud_unique_id");
TxnInfoPB txn_info_pb;
txn_info_pb.set_db_id(666);
- txn_info_pb.set_label("test_label");
+ txn_info_pb.set_label("test_label_" + std::to_string(i));
txn_info_pb.add_table_ids(1234);
txn_info_pb.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info_pb);
@@ -1854,6 +1868,11 @@ TEST(MetaServiceTest, CommitTxnTest) {
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.table_stats().size(), 1);
+
+ int64_t table_version = res.table_stats()[0].table_version();
+ get_table_version(meta_service.get(), 666, table_id,
table_version);
+ ASSERT_EQ(table_version, i + 1);
}
// doubly commit txn
@@ -2105,6 +2124,14 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
ASSERT_EQ(res.table_ids()[2], t1);
ASSERT_EQ(res.partition_ids()[2], t1_p1) << res.ShortDebugString();
ASSERT_EQ(res.versions()[2], 3) << res.ShortDebugString();
+
+ ASSERT_EQ(res.table_stats().size(), 2);
+ int64_t table_version = 0;
+ get_table_version(meta_service.get(), db_id, t1, table_version);
+ ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
+ table_version = 0;
+ get_table_version(meta_service.get(), db_id, t2, table_version);
+ ASSERT_EQ(res.table_stats()[1].table_version(), table_version);
}
// doubly commit txn
@@ -7862,6 +7889,8 @@ TEST(MetaServiceTest, IndexRequest) {
ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK);
val_int = *reinterpret_cast<const int64_t*>(val.data());
ASSERT_EQ(val_int, 1);
+ ASSERT_TRUE(res.has_table_version());
+ ASSERT_EQ(val_int, res.table_version());
// Last state DROPPED
reset_meta_service();
index_pb.set_state(RecycleIndexPB::DROPPED);
@@ -8113,6 +8142,8 @@ TEST(MetaServiceTest, PartitionRequest) {
ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK);
val_int = *reinterpret_cast<const int64_t*>(val.data());
ASSERT_EQ(val_int, 2);
+ ASSERT_TRUE(res.has_table_version());
+ ASSERT_EQ(val_int, res.table_version());
// Last state DROPPED
reset_meta_service();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
@@ -8274,6 +8305,8 @@ TEST(MetaServiceTest, PartitionRequest) {
ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK);
val_int = *reinterpret_cast<const int64_t*>(val.data());
ASSERT_EQ(val_int, 2);
+ ASSERT_TRUE(res.has_table_version());
+ ASSERT_EQ(val_int, res.table_version());
// Last state PREPARED
reset_meta_service();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
diff --git a/cloud/test/meta_service_versioned_read_test.cpp
b/cloud/test/meta_service_versioned_read_test.cpp
index eab1ec78d2e..58793011746 100644
--- a/cloud/test/meta_service_versioned_read_test.cpp
+++ b/cloud/test/meta_service_versioned_read_test.cpp
@@ -126,6 +126,20 @@ void compact_rowset(TxnKv* txn_kv, std::string
instance_id, int64_t tablet_id, i
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
+static void get_table_version(MetaServiceProxy* meta_service, int64_t db_id,
int64_t table_id,
+ int64_t& version) {
+ brpc::Controller ctrl;
+ GetVersionRequest req;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.set_is_table_version(true);
+ GetVersionResponse resp;
+ meta_service->get_version(&ctrl, &req, &resp, nullptr);
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
+ << ", get table version res=" << resp.ShortDebugString();
+ version = resp.version();
+}
+
// Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager.
static void create_and_refresh_instance(MetaServiceProxy* service, std::string
instance_id) {
// write instance
@@ -220,6 +234,11 @@ TEST(MetaServiceVersionedReadTest, CommitTxn) {
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ ASSERT_EQ(res.table_stats().size(), 1);
+ int64_t table_version = 0;
+ get_table_version(meta_service.get(), db_id, table_id,
table_version);
+ ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
}
// doubly commit txn
@@ -423,6 +442,14 @@ TEST(MetaServiceVersionedReadTest,
CommitTxnWithSubTxnTest) {
<< ", res=" << res.DebugString();
}
}
+
+ ASSERT_EQ(res.table_stats().size(), 2);
+ int64_t table_version = 0;
+ get_table_version(meta_service.get(), db_id, t2, table_version);
+ ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
+ table_version = 0;
+ get_table_version(meta_service.get(), db_id, t1, table_version);
+ ASSERT_EQ(res.table_stats()[1].table_version(), table_version);
}
// doubly commit txn
@@ -1140,6 +1167,11 @@ TEST(MetaServiceVersionedReadTest, IndexRequest) {
req.set_is_new_table(true);
meta_service->commit_index(&ctrl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+
+ ASSERT_TRUE(res.has_table_version());
+ int64_t table_version = 0;
+ get_table_version(meta_service.get(), db_id, table_id, table_version);
+ ASSERT_EQ(table_version, res.table_version());
}
{
@@ -1166,6 +1198,8 @@ TEST(MetaServiceVersionedReadTest, IndexRequest) {
req.set_is_new_table(true);
meta_service->commit_index(&ctrl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+
+ ASSERT_FALSE(res.has_table_version());
}
}
@@ -1205,6 +1239,11 @@ TEST(MetaServiceVersionedReadTest, PartitionRequest) {
req.add_partition_ids(partition_id);
meta_service->commit_partition(&ctrl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+
+ ASSERT_TRUE(res.has_table_version());
+ int64_t table_version = 0;
+ get_table_version(meta_service.get(), db_id, table_id, table_version);
+ ASSERT_EQ(table_version, res.table_version());
}
{
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index b316a5e5f45..a7506f52289 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -418,6 +418,20 @@ static void create_and_refresh_instance(
ASSERT_TRUE(service->resource_mgr()->is_version_write_enabled(instance_id));
}
+static void get_table_version(MetaServiceProxy* meta_service, int64_t db_id,
int64_t table_id,
+ int64_t& version) {
+ brpc::Controller ctrl;
+ GetVersionRequest req;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.set_is_table_version(true);
+ GetVersionResponse resp;
+ meta_service->get_version(&ctrl, &req, &resp, nullptr);
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
+ << ", get table version res=" << resp.ShortDebugString();
+ version = resp.version();
+}
+
TEST(TxnLazyCommitTest, CreateTabletWithDbIdTest) {
auto txn_kv = get_mem_txn_kv();
auto meta_service = get_meta_service(txn_kv, true);
@@ -626,6 +640,12 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithoutDbIdTest) {
ASSERT_GE(repair_tablet_idx_count, 0);
ASSERT_TRUE(last_pending_txn_id_hit);
ASSERT_TRUE(commit_txn_eventually_finish_hit);
+
+ ASSERT_EQ(res.table_stats().size(), 1);
+ int64_t table_version = 0;
+ get_table_version(meta_service.get(), db_id, table_id, table_version);
+ ASSERT_EQ(table_version, 1);
+ ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
}
{
@@ -917,6 +937,11 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) {
ASSERT_GE(repair_tablet_idx_count, 0);
ASSERT_TRUE(last_pending_txn_id_hit);
ASSERT_TRUE(commit_txn_eventually_finish_hit);
+
+ ASSERT_EQ(res.table_stats().size(), 1);
+ int64_t table_version = 0;
+ get_table_version(meta_service.get(), db_id, table_id, table_version);
+ ASSERT_EQ(res.table_stats()[0].table_version(), table_version);
}
{
@@ -1448,6 +1473,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
}
int64_t txn_id1 = 0;
+ int64_t txn1_table_version = 0;
std::thread thread1([&] {
{
std::unique_lock<std::mutex> _lock(go_mutex);
@@ -1494,10 +1520,14 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ ASSERT_EQ(res.table_stats().size(), 1);
+ txn1_table_version = res.table_stats()[0].table_version();
}
});
int64_t txn_id2 = 0;
+ int64_t txn2_table_version = 0;
std::thread thread2([&] {
{
std::unique_lock<std::mutex> _lock(go_mutex);
@@ -1544,6 +1574,9 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ ASSERT_EQ(res.table_stats().size(), 1);
+ txn2_table_version = res.table_stats()[0].table_version();
}
});
@@ -1555,6 +1588,11 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
thread1.join();
thread2.join();
+ ASSERT_TRUE(txn1_table_version == 1 && txn2_table_version == 2 ||
+ txn1_table_version == 2 && txn2_table_version == 1)
+ << ", txn1_table_version=" << txn1_table_version
+ << ", txn2_table_version=" << txn2_table_version;
+
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7a0ce25a888..2b06fcc5a80 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3631,6 +3631,25 @@ public class Config extends ConfigBase {
"Maximal concurrent num of get tablet stat job."})
public static int max_get_tablet_stat_task_threads_num = 4;
+ @ConfField(description = {"存算分离模式下同步 table 和 partition version 的间隔. 所有
frontend 都会检查",
+ "Cloud table and partition version syncer interval. All frontends
will perform the checking"})
+ public static int cloud_version_syncer_interval_second = 20;
+
+ @ConfField(mutable = true, description = {"存算分离模式下是否启用同步 table 和 partition
version 的功能",
+ "Whether to enable the function of syncing table and partition
version in cloud mode"})
+ public static boolean cloud_enable_version_syncer = true;
+
+ @ConfField(description = {"Get version task 的并发数", "Concurrent num of get
version task."})
+ public static int cloud_get_version_task_threads_num = 4;
+
+ @ConfField(description = {"Master FE 发送给其它 FE sync version task 的最大并发数",
+ "Maximal concurrent num of sync version task between Master FE and
other FEs."})
+ public static int cloud_sync_version_task_threads_num = 4;
+
+ @ConfField(mutable = true, description = {"Get version task 包含的 table 或
partition 数目的 batch size",
+ "Maximal table or partition batch size of get version task."})
+ public static int cloud_get_version_task_batch_size = 2000;
+
@ConfField(mutable = true, description = {"schema change job 失败是否重试",
"Whether to enable retry when a schema change job fails, default
is true."})
public static boolean enable_schema_change_retry = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index cc78ae250cc..3d6de95aed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -128,6 +128,8 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@@ -243,6 +245,8 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
private volatile long lastTableVersionCachedTimeMs = 0;
private volatile long cachedTableVersion = -1;
+ private ReadWriteLock versionLock = Config.isCloudMode() ? new
ReentrantReadWriteLock(true) : null;
+
public OlapTable() {
// for persist
super(TableType.OLAP);
@@ -3333,8 +3337,15 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
return System.currentTimeMillis() - lastTableVersionCachedTimeMs >
cacheExpirationMs;
}
- @VisibleForTesting
- protected void setCachedTableVersion(long version) {
+ public boolean isCachedTableVersionExpired(long expirationMs) {
+ // -1 means no cache yet, need to fetch from MS
+ if (cachedTableVersion == -1 || expirationMs <= 0) {
+ return true;
+ }
+ return System.currentTimeMillis() - lastTableVersionCachedTimeMs >
expirationMs;
+ }
+
+ public void setCachedTableVersion(long version) {
if (version >= cachedTableVersion) {
cachedTableVersion = version;
lastTableVersionCachedTimeMs = System.currentTimeMillis();
@@ -3400,6 +3411,49 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
.collect(Collectors.toList());
}
+ ConnectContext ctx = ConnectContext.get();
+ long cloudTableVersionCacheTtlMs = ctx == null
+ ?
VariableMgr.getDefaultSessionVariable().cloudTableVersionCacheTtlMs
+ : ctx.getSessionVariable().cloudTableVersionCacheTtlMs;
+ if (cloudTableVersionCacheTtlMs <= 0) { // No cached versions will be
used
+ return getVisibleVersionInBatchFromMs(tables);
+ }
+
+ // tableId -> cachedVersion, 0 means to be fetched from meta-service
+ List<Pair<Long, Long>> allVersions = new ArrayList<>(tables.size());
+ List<OlapTable> expiredTables = new ArrayList<>(tables.size());
+ for (OlapTable table : tables) {
+ long ver = table.getCachedTableVersion();
+ if (table.isCachedTableVersionExpired()) {
+ expiredTables.add(table);
+ ver = 0L; // 0 means to be fetched from meta-service
+ }
+ allVersions.add(Pair.of(table.getId(), ver));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("cloudTableVersionCacheTtlMs={}, numTables={},
numExpiredTables={}",
+ cloudTableVersionCacheTtlMs, tables.size(),
expiredTables.size());
+ }
+
+ List<Long> msVersions = null;
+ if (!expiredTables.isEmpty()) { // Not all table versions are from
cache
+ msVersions = getVisibleVersionInBatchFromMs(expiredTables);
+ }
+ int msIdx = 0;
+ for (Pair<Long, Long> v : allVersions) { // ATTN: keep the assigning
order!!!
+ if (v.second == 0L && msVersions != null) {
+ v.second = msVersions.get(msIdx++);
+ }
+ }
+ if (!expiredTables.isEmpty()) { // Not all table versions are from
cache
+ assert msIdx == msVersions.size() : "size not match, idx=" + msIdx
+ " verSize=" + msVersions.size();
+ }
+
+ return allVersions.stream().map(i ->
i.second).collect(Collectors.toList());
+ }
+
+ // Get the table versions in batch from meta-service, and update cache.
+ private static List<Long> getVisibleVersionInBatchFromMs(List<OlapTable>
tables) {
List<Long> dbIds = new ArrayList<>(tables.size());
List<Long> tableIds = new ArrayList<>(tables.size());
for (OlapTable table : tables) {
@@ -3417,7 +3471,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
return versions;
}
- private static List<Long> getVisibleVersionFromMeta(List<Long> dbIds,
List<Long> tableIds) {
+ public static List<Long> getVisibleVersionFromMeta(List<Long> dbIds,
List<Long> tableIds) {
// get version rpc
Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
.setRequestIp(FrontendOptions.getLocalHostAddressCached())
@@ -3869,4 +3923,20 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
}
return matched;
}
+
+ public void versionReadLock() {
+ versionLock.readLock().lock();
+ }
+
+ public void versionReadUnlock() {
+ versionLock.readLock().unlock();
+ }
+
+ public void versionWriteLock() {
+ versionLock.writeLock().lock();
+ }
+
+ public void versionWriteUnlock() {
+ versionLock.writeLock().unlock();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index cadf2d3b75a..f2b17cda54f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -845,7 +845,7 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
throw new Exception("debug point
FE.DynamicPartitionScheduler.before.commitCloudPartition");
}
Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(),
olapTable.getId(),
- succeedPartitionIds, indexIds, true /* isCreateTable */,
false /* isBatchCommit */);
+ succeedPartitionIds, indexIds, true /* isCreateTable */,
false /* isBatchCommit */, olapTable);
LOG.info("begin write edit log to add partitions in batch, "
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
batchPartsInfo.size(), db.getFullName(), tableName,
olapTable.getId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 65786c29976..07d4fe552d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -69,6 +69,8 @@ public class CloudEnv extends Env {
private CloudTabletRebalancer cloudTabletRebalancer;
private CacheHotspotManager cacheHotspotMgr;
+ private CloudSyncVersionDaemon cloudSyncVersionDaemon;
+ private CloudFEVersionSynchronizer cloudFEVersionSynchronizer;
private boolean enableStorageVault;
@@ -89,6 +91,8 @@ public class CloudEnv extends Env {
this.cloudTabletRebalancer = new
CloudTabletRebalancer((CloudSystemInfoService) systemInfo);
this.cacheHotspotMgr = new
CacheHotspotManager((CloudSystemInfoService) systemInfo);
this.upgradeMgr = new CloudUpgradeMgr((CloudSystemInfoService)
systemInfo);
+ this.cloudSyncVersionDaemon = new CloudSyncVersionDaemon();
+ this.cloudFEVersionSynchronizer = new CloudFEVersionSynchronizer();
this.cloudSnapshotHandler = CloudSnapshotHandler.getInstance();
}
@@ -146,6 +150,7 @@ public class CloudEnv extends Env {
super.initialize(args);
this.cloudSnapshotHandler.initialize();
+ cloudInstanceStatusChecker.start();
}
@Override
@@ -162,11 +167,15 @@ public class CloudEnv extends Env {
cloudSnapshotHandler.start();
}
+ public CloudFEVersionSynchronizer getCloudFEVersionSynchronizer() {
+ return cloudFEVersionSynchronizer;
+ }
+
@Override
protected void startNonMasterDaemonThreads() {
LOG.info("start cloud Non Master only daemon threads");
super.startNonMasterDaemonThreads();
- cloudInstanceStatusChecker.start();
+ cloudSyncVersionDaemon.start();
}
public static String genFeNodeNameFromMeta(String host, int port, long
timeMs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudFEVersionSynchronizer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudFEVersionSynchronizer.java
new file mode 100644
index 00000000000..f843b8b58e7
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudFEVersionSynchronizer.java
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TCloudVersionInfo;
+import org.apache.doris.thrift.TFrontendSyncCloudVersionRequest;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+public class CloudFEVersionSynchronizer {
+ private static final Logger LOG =
LogManager.getLogger(CloudFEVersionSynchronizer.class);
+
+ private static final ExecutorService SYNC_VERSION_THREAD_POOL =
Executors.newFixedThreadPool(
+ Config.cloud_sync_version_task_threads_num,
+ new
ThreadFactoryBuilder().setNameFormat("sync-version-%d").setDaemon(true).build());
+
+ public CloudFEVersionSynchronizer() {
+ }
+
+ // master FE send sync version rpc to other FEs
+ public void pushVersionAsync(long dbId, OlapTable table, long version) {
+ if (!Config.cloud_enable_version_syncer) {
+ return;
+ }
+ pushVersionAsync(dbId, Collections.singletonList(Pair.of(table,
version)), Collections.emptyMap());
+ }
+
+ public void pushVersionAsync(long dbId, List<Pair<OlapTable, Long>>
tableVersions,
+ Map<CloudPartition, Pair<Long, Long>> partitionVersionMap) {
+ if (!Config.cloud_enable_version_syncer) {
+ return;
+ }
+ if (tableVersions.isEmpty() && partitionVersionMap.isEmpty()) {
+ return;
+ }
+ pushVersion(dbId, tableVersions, partitionVersionMap);
+ }
+
+ private void pushVersion(long dbId, List<Pair<OlapTable, Long>>
tableVersions,
+ Map<CloudPartition, Pair<Long, Long>> partitionVersionMap) {
+ List<Frontend> frontends = getFrontends();
+ if (frontends == null || frontends.isEmpty()) {
+ return;
+ }
+
+ List<TCloudVersionInfo> tableVersionInfos = new
ArrayList<>(tableVersions.size());
+ tableVersions.forEach(pair -> {
+ TCloudVersionInfo tableVersion = new TCloudVersionInfo();
+ tableVersion.setTableId(pair.first.getId());
+ tableVersion.setVersion(pair.second);
+ tableVersionInfos.add(tableVersion);
+ });
+ List<TCloudVersionInfo> partitionVersionInfos = new
ArrayList<>(partitionVersionMap.size());
+ partitionVersionMap.forEach((partition, versionPair) -> {
+ TCloudVersionInfo partitionVersion = new TCloudVersionInfo();
+ partitionVersion.setTableId(partition.getTableId());
+ partitionVersion.setPartitionId(partition.getId());
+ partitionVersion.setVersion(versionPair.first);
+ partitionVersion.setVersionUpdateTime(versionPair.second);
+ partitionVersionInfos.add(partitionVersion);
+ });
+ TFrontendSyncCloudVersionRequest request = new
TFrontendSyncCloudVersionRequest();
+ request.setDbId(dbId);
+ request.setTableVersionInfos(tableVersionInfos);
+ request.setPartitionVersionInfos(partitionVersionInfos);
+ for (Frontend fe : frontends) {
+ SYNC_VERSION_THREAD_POOL.submit(() -> {
+ try {
+ pushVersionToFe(request, fe);
+ } catch (Exception e) {
+ LOG.warn("push cloud version error", e);
+ }
+ });
+ }
+ }
+
+ private List<Frontend> getFrontends() {
+ HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+ return Env.getCurrentEnv().getFrontends(null).stream()
+ .filter(fe -> fe.isAlive() &&
!(fe.getHost().equals(selfNode.getHost())
+ && fe.getEditLogPort() == selfNode.getPort())).collect(
+ Collectors.toList());
+ }
+
+ private void pushVersionToFe(TFrontendSyncCloudVersionRequest request,
Frontend fe) {
+ FrontendService.Client client = null;
+ TNetworkAddress addr = new TNetworkAddress(fe.getHost(),
fe.getRpcPort());
+ boolean ok = false;
+ try {
+ client = ClientPool.frontendVersionPool.borrowObject(addr);
+ TStatus status = client.syncCloudVersion(request);
+ ok = true;
+ if (status.getStatusCode() != TStatusCode.OK) {
+ LOG.warn("failed to push cloud version to frontend {}:{}, err:
{}", fe.getHost(), fe.getRpcPort(),
+ status.getErrorMsgs());
+ }
+ } catch (Exception e) {
+ LOG.warn("failed to push cloud version to frontend {}:{}",
fe.getHost(), fe.getRpcPort(), e);
+ } finally {
+ if (ok) {
+ ClientPool.frontendVersionPool.returnObject(addr, client);
+ } else {
+ ClientPool.frontendVersionPool.invalidateObject(addr, client);
+ }
+ }
+ }
+
+ // follower and observer FE receive sync version rpc from master FE
+ public void syncVersionAsync(TFrontendSyncCloudVersionRequest request) {
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(request.getDbId());
+ if (db == null) {
+ return;
+ }
+ // only update table version
+ if (request.getPartitionVersionInfos().isEmpty()) {
+ request.getTableVersionInfos().forEach(tableVersionInfo -> {
+ Table table =
db.getTableNullable(tableVersionInfo.getTableId());
+ if (table == null || !table.isManagedTable()) {
+ return;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ olapTable.setCachedTableVersion(tableVersionInfo.getVersion());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Update tableId: {}, version: {}",
olapTable.getId(), tableVersionInfo.getVersion());
+ }
+ });
+ return;
+ }
+ // update partition and table version
+ SYNC_VERSION_THREAD_POOL.submit(() -> {
+ syncVersion(db, request);
+ });
+ }
+
+ private void syncVersion(Database db, TFrontendSyncCloudVersionRequest
request) {
+ List<Pair<OlapTable, Long>> tableVersions = new
ArrayList<>(request.getTableVersionInfos().size());
+ for (TCloudVersionInfo tableVersionInfo :
request.getTableVersionInfos()) {
+ Table table = db.getTableNullable(tableVersionInfo.getTableId());
+ if (table == null || !table.isManagedTable()) {
+ continue;
+ }
+ tableVersions.add(Pair.of((OlapTable) table,
tableVersionInfo.getVersion()));
+ }
+ Collections.sort(tableVersions, Comparator.comparingLong(o ->
o.first.getId()));
+ for (Pair<OlapTable, Long> tableVersion : tableVersions) {
+ tableVersion.first.versionWriteLock();
+ }
+ try {
+ for (TCloudVersionInfo partitionVersionInfo :
request.getPartitionVersionInfos()) {
+ Table table =
db.getTableNullable(partitionVersionInfo.getTableId());
+ if (table == null || !table.isManagedTable()) {
+ continue;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ Partition partition =
olapTable.getPartition(partitionVersionInfo.getPartitionId());
+ if (partition == null) {
+ continue;
+ }
+ CloudPartition cloudPartition = (CloudPartition) partition;
+
cloudPartition.setCachedVisibleVersion(partitionVersionInfo.getVersion(),
+ partitionVersionInfo.getVersionUpdateTime());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Update tableId: {}, partitionId: {}, version:
{}, updateTime: {}",
+ partitionVersionInfo.getTableId(),
partition.getId(), partitionVersionInfo.getVersion(),
+ partitionVersionInfo.getVersionUpdateTime());
+ }
+ }
+ for (Pair<OlapTable, Long> tableVersion : tableVersions) {
+ tableVersion.first.setCachedTableVersion(tableVersion.second);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Update tableId: {}, version: {}",
tableVersion.first.getId(), tableVersion.second);
+ }
+ }
+ } finally {
+ for (int i = tableVersions.size() - 1; i >= 0; i--) {
+ tableVersions.get(i).first.versionWriteUnlock();
+ }
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
index 459526e68de..d518721fabe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
@@ -17,10 +17,13 @@
package org.apache.doris.cloud.catalog;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
import org.apache.doris.cloud.rpc.VersionHelper;
@@ -41,7 +44,10 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -144,7 +150,7 @@ public class CloudPartition extends Partition {
return getVisibleVersionFromMs(false);
}
- public long getVisibleVersionFromMs(boolean waitForPendingTxns) {
+ private long getVisibleVersionFromMs(boolean waitForPendingTxns) {
if (LOG.isDebugEnabled()) {
LOG.debug("getVisibleVersionFromMs use CloudPartition {},
waitForPendingTxns: {}",
super.getName(), waitForPendingTxns);
@@ -255,6 +261,27 @@ public class CloudPartition extends Partition {
return versions;
}
+ private static List<OlapTable> getTables(List<CloudPartition> partitions) {
+ Map<Long, OlapTable> tableMap = new HashMap<>();
+ for (CloudPartition partition : partitions) {
+ if (tableMap.containsKey(partition.getTableId())) {
+ continue;
+ }
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(partition.getDbId());
+ if (db == null) {
+ continue;
+ }
+ Table table = db.getTableNullable(partition.getTableId());
+ if (table == null) {
+ continue;
+ }
+ tableMap.put(partition.getTableId(), (OlapTable) table);
+ }
+ List<OlapTable> tables =
tableMap.values().stream().collect(Collectors.toCollection(ArrayList::new));
+ Collections.sort(tables, Comparator.comparingLong(o -> o.getId()));
+ return tables;
+ }
+
// Get visible version from the specified partitions;
//
// Return the visible version in order of the specified partition ids, -1
means version NOT FOUND.
@@ -272,15 +299,24 @@ public class CloudPartition extends Partition {
// partitionId -> cachedVersion
List<Pair<Long, Long>> allVersions = new
ArrayList<>(partitions.size());
List<CloudPartition> expiredPartitions = new
ArrayList<>(partitions.size());
- for (CloudPartition partition : partitions) {
- long ver = partition.getCachedVisibleVersion();
- if (partition.isCachedVersionExpired()) {
- expiredPartitions.add(partition);
- ver = 0L; // 0 means to be get from meta-service
+ List<OlapTable> tables = getTables(partitions);
+ for (OlapTable table : tables) {
+ table.versionReadLock();
+ }
+ try {
+ for (CloudPartition partition : partitions) {
+ long ver = partition.getCachedVisibleVersion();
+ if (partition.isCachedVersionExpired()) {
+ expiredPartitions.add(partition);
+ ver = 0L; // 0 means to be get from meta-service
+ }
+ allVersions.add(Pair.of(partition.getId(), ver));
+ }
+ } finally {
+ for (int i = tables.size() - 1; i >= 0; i--) {
+ tables.get(i).versionReadUnlock();
}
- allVersions.add(Pair.of(partition.getId(), ver));
}
-
if (LOG.isDebugEnabled()) {
LOG.debug("cloudPartitionVersionCacheTtlMs={}, numPartitions={},
numFilteredPartitions={}",
cloudPartitionVersionCacheTtlMs, partitions.size(),
partitions.size() - expiredPartitions.size());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudSyncVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudSyncVersionDaemon.java
new file mode 100644
index 00000000000..d65a6806eac
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudSyncVersionDaemon.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+public class CloudSyncVersionDaemon extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(CloudSyncVersionDaemon.class);
+ private static final ExecutorService GET_VERSION_THREAD_POOL =
Executors.newFixedThreadPool(
+ Config.cloud_get_version_task_threads_num,
+ new
ThreadFactoryBuilder().setNameFormat("get-version-%d").setDaemon(true).build());
+
+ public CloudSyncVersionDaemon() {
+ super("cloud table and partition version checker",
+ Config.cloud_version_syncer_interval_second * 1000);
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!Config.cloud_enable_version_syncer) {
+ return;
+ }
+ LOG.info("begin sync cloud table and partition version");
+ Map<OlapTable, Long> tableVersionMap = syncTableVersions();
+ if (!tableVersionMap.isEmpty()) {
+ syncPartitionVersion(tableVersionMap);
+ }
+ }
+
+ private Map<OlapTable, Long> syncTableVersions() {
+ Map<OlapTable, Long> tableVersionMap = new ConcurrentHashMap<>();
+ List<Future<Void>> futures = new ArrayList<>();
+ long start = System.currentTimeMillis();
+ List<Long> dbIds = new ArrayList<>();
+ List<Long> tableIds = new ArrayList<>();
+ List<OlapTable> tables = new ArrayList<>();
+ // TODO meta service support range scan all table versions
+ for (Database db : Env.getCurrentInternalCatalog().getDbs()) {
+ List<Table> tableList = db.getTables();
+ for (Table table : tableList) {
+ if (!table.isManagedTable()) {
+ continue;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ if (!olapTable.isCachedTableVersionExpired(
+ Config.cloud_version_syncer_interval_second * 1000)) {
+ continue;
+ }
+ dbIds.add(db.getId());
+ tableIds.add(olapTable.getId());
+ tables.add(olapTable);
+ if (dbIds.size() >= Config.cloud_get_version_task_batch_size) {
+ Future<Void> future =
submitGetTableVersionTask(tableVersionMap, ImmutableList.copyOf(dbIds),
+ ImmutableList.copyOf(tableIds),
ImmutableList.copyOf(tables));
+ futures.add(future);
+ dbIds.clear();
+ tableIds.clear();
+ tables.clear();
+ }
+ }
+ }
+ if (!dbIds.isEmpty()) {
+ Future<Void> future = submitGetTableVersionTask(tableVersionMap,
ImmutableList.copyOf(dbIds),
+ ImmutableList.copyOf(tableIds),
ImmutableList.copyOf(tables));
+ futures.add(future);
+ dbIds.clear();
+ tableIds.clear();
+ tables.clear();
+ }
+ try {
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error waiting for get table version tasks to complete",
e);
+ }
+ LOG.info("sync table version cost {} ms, rpc size: {}, found {} tables
need to sync partition version",
+ System.currentTimeMillis() - start, futures.size(),
tableVersionMap.size());
+ return tableVersionMap;
+ }
+
+ private Future<Void> submitGetTableVersionTask(Map<OlapTable, Long>
tableVersionMap, List<Long> dbIds,
+ List<Long> tableIds, List<OlapTable> tables) {
+ return GET_VERSION_THREAD_POOL.submit(() -> {
+ try {
+ List<Long> versions =
OlapTable.getVisibleVersionFromMeta(dbIds, tableIds);
+ for (int i = 0; i < tables.size(); i++) {
+ OlapTable table = tables.get(i);
+ long version = versions.get(i);
+ if (version > table.getCachedTableVersion()) {
+ tableVersionMap.compute(table, (k, v) -> {
+ if (v == null || version > v) {
+ return version;
+ } else {
+ return v;
+ }
+ });
+ } else {
+ // update lastTableVersionCachedTimeMs
+ table.setCachedTableVersion(version);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("get table version error", e);
+ }
+ return null;
+ });
+ }
+
+ private void syncPartitionVersion(Map<OlapTable, Long> tableVersionMap) {
+ Set<Long> failedTables = ConcurrentHashMap.newKeySet();
+ List<Future<Void>> futures = new ArrayList<>();
+ long start = System.currentTimeMillis();
+ List<CloudPartition> partitions = new ArrayList<>();
+ // TODO meta service support range scan partition versions
+ for (Entry<OlapTable, Long> entry : tableVersionMap.entrySet()) {
+ OlapTable olapTable = entry.getKey();
+ LOG.info("sync partition version for db: {}, table: {}, table
cache version: {}, new version: {}",
+ olapTable.getDatabase().getId(), olapTable,
olapTable.getCachedTableVersion(), entry.getValue());
+ for (Partition partition : olapTable.getAllPartitions()) {
+ partitions.add((CloudPartition) partition);
+ if (partitions.size() >=
Config.cloud_get_version_task_batch_size) {
+ Future<Void> future =
submitGetPartitionVersionTask(failedTables, ImmutableList.copyOf(partitions));
+ futures.add(future);
+ partitions.clear();
+ }
+ }
+ }
+ if (partitions.size() > 0) {
+ Future<Void> future = submitGetPartitionVersionTask(failedTables,
ImmutableList.copyOf(partitions));
+ futures.add(future);
+ partitions.clear();
+ }
+ try {
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error waiting for get partition version tasks to
complete", e);
+ }
+ // set table version for success tables
+ for (Entry<OlapTable, Long> entry : tableVersionMap.entrySet()) {
+ if (!failedTables.contains(entry.getKey().getId())) {
+ OlapTable olapTable = entry.getKey();
+ olapTable.setCachedTableVersion(entry.getValue());
+ }
+ }
+ LOG.info("sync partition version cost {} ms, table size: {}, rpc size:
{}, failed tables: {}",
+ System.currentTimeMillis() - start, tableVersionMap.size(),
futures.size(), failedTables);
+ }
+
+ private Future<Void> submitGetPartitionVersionTask(Set<Long> failedTables,
List<CloudPartition> partitions) {
+ return GET_VERSION_THREAD_POOL.submit(() -> {
+ try {
+ CloudPartition.getSnapshotVisibleVersionFromMs(partitions,
false);
+ } catch (Exception e) {
+ LOG.warn("get partition version error", e);
+ Set<Long> failedTableIds = partitions.stream().map(p ->
p.getTableId())
+ .collect(Collectors.toSet());
+ failedTables.addAll(failedTableIds);
+ }
+ return null;
+ });
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 043a10258ea..f8e6573bdc3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -37,6 +37,7 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.catalog.CloudEnv;
@@ -484,12 +485,22 @@ public class CloudInternalCatalog extends InternalCatalog
{
*/
@Override
public void afterCreatePartitions(long dbId, long tableId, List<Long>
partitionIds, List<Long> indexIds,
- boolean isCreateTable, boolean isBatchCommit)
+ boolean isCreateTable, boolean isBatchCommit, OlapTable olapTable)
throws DdlException {
if (isBatchCommit) {
- commitMaterializedIndex(dbId, tableId, indexIds, partitionIds,
isCreateTable);
+ long tableVersion = commitMaterializedIndex(dbId, tableId,
indexIds, partitionIds, isCreateTable);
+ if (olapTable != null && isCreateTable && tableVersion > 0) {
+ olapTable.setCachedTableVersion(tableVersion);
+ ((CloudEnv)
Env.getCurrentEnv()).getCloudFEVersionSynchronizer()
+ .pushVersionAsync(dbId, olapTable, tableVersion);
+ }
} else {
- commitPartition(dbId, tableId, partitionIds, indexIds);
+ long tableVersion = commitPartition(dbId, tableId, partitionIds,
indexIds);
+ if (olapTable != null && tableVersion > 0) {
+ olapTable.setCachedTableVersion(tableVersion);
+ ((CloudEnv)
Env.getCurrentEnv()).getCloudFEVersionSynchronizer()
+ .pushVersionAsync(dbId, olapTable, tableVersion);
+ }
}
if (!Config.check_create_table_recycle_key_remained) {
return;
@@ -554,11 +565,14 @@ public class CloudInternalCatalog extends InternalCatalog
{
}
}
- public void commitPartition(long dbId, long tableId, List<Long>
partitionIds, List<Long> indexIds)
+ /**
+ * @return table version if returned by MetaService, otherwise return 0
+ */
+ public long commitPartition(long dbId, long tableId, List<Long>
partitionIds, List<Long> indexIds)
throws DdlException {
if (Config.enable_check_compatibility_mode) {
LOG.info("skip committing partitions in check compatibility mode");
- return;
+ return 0;
}
Cloud.PartitionRequest.Builder partitionRequestBuilder =
Cloud.PartitionRequest.newBuilder()
@@ -593,6 +607,10 @@ public class CloudInternalCatalog extends InternalCatalog {
LOG.warn("commitPartition response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
+ if (response.hasTableVersion()) {
+ return response.getTableVersion();
+ }
+ return 0;
}
// if `expiration` = 0, recycler will delete uncommitted indexes in
`retention_seconds`
@@ -633,12 +651,15 @@ public class CloudInternalCatalog extends InternalCatalog
{
}
}
- public void commitMaterializedIndex(long dbId, long tableId, List<Long>
indexIds, List<Long> partitionIds,
+ /**
+ * @return table version if returned by MetaService, otherwise return 0
+ */
+ public long commitMaterializedIndex(long dbId, long tableId, List<Long>
indexIds, List<Long> partitionIds,
boolean isCreateTable)
throws DdlException {
if (Config.enable_check_compatibility_mode) {
LOG.info("skip committing materialized index in checking
compatibility mode");
- return;
+ return 0;
}
Cloud.IndexRequest.Builder indexRequestBuilder =
Cloud.IndexRequest.newBuilder()
@@ -674,6 +695,10 @@ public class CloudInternalCatalog extends InternalCatalog {
LOG.warn("commitIndex response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
+ if (isCreateTable && response.hasTableVersion()) {
+ return response.getTableVersion();
+ }
+ return 0;
}
private void checkPartition(long dbId, long tableId, List<Long>
partitionIds)
@@ -925,6 +950,19 @@ public class CloudInternalCatalog extends InternalCatalog {
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("dropPartition response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
+ } else if (needUpdateTableVersion && response.hasTableVersion() &&
response.getTableVersion() > 0) {
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ return;
+ }
+ Table table = db.getTableNullable(tableId);
+ if (table != null && table instanceof OlapTable) {
+ OlapTable olapTable = (OlapTable) table;
+ long tableVersion = response.getTableVersion();
+ olapTable.setCachedTableVersion(tableVersion);
+ ((CloudEnv)
Env.getCurrentEnv()).getCloudFEVersionSynchronizer()
+ .pushVersionAsync(dbId, olapTable, tableVersion);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 3ada132ad63..4fe2e9cc61a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest;
import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse;
@@ -142,6 +143,7 @@ import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
@@ -505,44 +507,17 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
// 1. update rowCountfor AnalysisManager
Map<Long, Long> updatedRows = new HashMap<>();
for (TableStatsPB tableStats : commitTxnResponse.getTableStatsList()) {
- LOG.info("Update RowCount for AnalysisManager. transactionId:{},
table_id:{}, updated_row_count:{}",
- txnId, tableStats.getTableId(),
tableStats.getUpdatedRowCount());
- updatedRows.put(tableStats.getTableId(),
tableStats.getUpdatedRowCount());
+ if (tableStats.hasUpdatedRowCount()) {
+ LOG.info("Update RowCount for AnalysisManager.
transactionId:{}, table_id:{}, updated_row_count:{}",
+ txnId, tableStats.getTableId(),
tableStats.getUpdatedRowCount());
+ updatedRows.put(tableStats.getTableId(),
tableStats.getUpdatedRowCount());
+ }
}
Env env = Env.getCurrentEnv();
env.getAnalysisManager().updateUpdatedRows(updatedRows);
- // 2. notify partition first load
- int totalPartitionNum = commitTxnResponse.getPartitionIdsList().size();
- // a map to record <tableId, [firstLoadPartitionIds]>
- Map<Long, List<Long>> tablePartitionMap = Maps.newHashMap();
- for (int idx = 0; idx < totalPartitionNum; ++idx) {
- long version = commitTxnResponse.getVersions(idx);
- long tableId = commitTxnResponse.getTableIds(idx);
- if (version == 2) {
- // inform AnalysisManager first load partitions
- tablePartitionMap.computeIfAbsent(tableId, k ->
Lists.newArrayList());
-
tablePartitionMap.get(tableId).add(commitTxnResponse.getPartitionIds(idx));
- }
- // 3. update CloudPartition
- OlapTable olapTable = (OlapTable)
env.getInternalCatalog().getDb(dbId)
- .flatMap(db -> db.getTable(tableId)).filter(t ->
t.isManagedTable())
- .orElse(null);
- if (olapTable == null) {
- continue;
- }
- CloudPartition partition = (CloudPartition) olapTable.getPartition(
- commitTxnResponse.getPartitionIds(idx));
- if (partition == null) {
- continue;
- }
- if (version == 2) {
-
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
- .stream().forEach(i -> i.setRowCountReported(false));
- }
- partition.setCachedVisibleVersion(version,
commitTxnResponse.getVersionUpdateTimeMs());
- LOG.info("Update Partition. transactionId:{}, table_id:{},
partition_id:{}, version:{}, update time:{}",
- txnId, tableId, partition.getId(), version,
commitTxnResponse.getVersionUpdateTimeMs());
- }
+ // 2. update table and partition version
+ Map<Long, List<Long>> tablePartitionMap =
updateVersion(commitTxnResponse);
+ // 3. notify partition first load
env.getAnalysisManager().setNewPartitionLoaded(
tablePartitionMap.keySet().stream().collect(Collectors.toList()));
// tablePartitionMap to string
@@ -578,6 +553,83 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
}
+ private Map<Long, List<Long>> updateVersion(CommitTxnResponse
commitTxnResponse) {
+ long dbId = commitTxnResponse.getTxnInfo().getDbId();
+ long txnId = commitTxnResponse.getTxnInfo().getTxnId();
+ int totalPartitionNum = commitTxnResponse.getPartitionIdsList().size();
+ if (totalPartitionNum == 0 &&
commitTxnResponse.getTableStatsList().isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Env env = Env.getCurrentEnv();
+ // partition -> <version, versionUpdateTime>
+ Map<CloudPartition, Pair<Long, Long>> partitionVersionMap = new
HashMap<>();
+ // a map to record <tableId, [firstLoadPartitionIds]>
+ Map<Long, List<Long>> tablePartitionMap = Maps.newHashMap();
+ for (int idx = 0; idx < totalPartitionNum; ++idx) {
+ long version = commitTxnResponse.getVersions(idx);
+ long tableId = commitTxnResponse.getTableIds(idx);
+ if (version == 2) {
+ // inform AnalysisManager first load partitions
+ tablePartitionMap.computeIfAbsent(tableId, k ->
Lists.newArrayList());
+
tablePartitionMap.get(tableId).add(commitTxnResponse.getPartitionIds(idx));
+ }
+ // 3. update CloudPartition
+ OlapTable olapTable = (OlapTable)
env.getInternalCatalog().getDb(dbId)
+ .flatMap(db -> db.getTable(tableId)).filter(t ->
t.isManagedTable())
+ .orElse(null);
+ if (olapTable == null) {
+ continue;
+ }
+ CloudPartition partition = (CloudPartition) olapTable.getPartition(
+ commitTxnResponse.getPartitionIds(idx));
+ if (partition == null) {
+ continue;
+ }
+ if (version == 2) {
+
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
+ .stream().forEach(i -> i.setRowCountReported(false));
+ }
+ partitionVersionMap.put(partition, Pair.of(version,
commitTxnResponse.getVersionUpdateTimeMs()));
+ }
+ // collect table versions
+ Database db = env.getInternalCatalog().getDb(dbId).get();
+ List<Pair<OlapTable, Long>> tableVersions = new
ArrayList<>(commitTxnResponse.getTableStatsList().size());
+ for (TableStatsPB tableStats : commitTxnResponse.getTableStatsList()) {
+ if (!tableStats.hasTableVersion()) {
+ continue;
+ }
+ Table table = db.getTableNullable(tableStats.getTableId());
+ if (table == null || !table.isManagedTable()) {
+ continue;
+ }
+ tableVersions.add(Pair.of((OlapTable) table,
tableStats.getTableVersion()));
+ }
+ Collections.sort(tableVersions, Comparator.comparingLong(o ->
o.first.getId()));
+ // update partition version and table version
+ for (Pair<OlapTable, Long> tableVersion : tableVersions) {
+ tableVersion.first.versionWriteLock();
+ }
+ try {
+ partitionVersionMap.forEach((partition, versionPair) -> {
+ partition.setCachedVisibleVersion(versionPair.first,
versionPair.second);
+ LOG.info("Update Partition. transactionId:{}, table_id:{},
partition_id:{}, version:{}, update time:{}",
+ txnId, partition.getTableId(), partition.getId(),
versionPair.first, versionPair.second);
+ });
+ for (Pair<OlapTable, Long> tableVersion : tableVersions) {
+ tableVersion.first.setCachedTableVersion(tableVersion.second);
+ LOG.info("Update Table. transactionId:{}, table_id:{},
version:{}", txnId, tableVersion.first.getId(),
+ tableVersion.second);
+ }
+ } finally {
+ for (int i = tableVersions.size() - 1; i >= 0; i--) {
+ tableVersions.get(i).first.versionWriteUnlock();
+ }
+ }
+ // notify follower and observer FE to update their version cache
+ ((CloudEnv)
env).getCloudFEVersionSynchronizer().pushVersionAsync(dbId, tableVersions,
partitionVersionMap);
+ return tablePartitionMap;
+ }
+
private Set<Long> getBaseTabletsFromTables(List<Table> tableList,
List<TabletCommitInfo> tabletCommitInfos)
throws MetaNotFoundException {
Set<Long> baseTabletIds = Sets.newHashSet();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index bdfffbe4802..2b769ae2e62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -66,6 +66,9 @@ public class ClientPool {
public static GenericPool<FrontendService.Client> frontendHeartbeatPool =
new GenericPool<>("FrontendService", heartbeatConfig,
heartbeatTimeoutMs,
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
+ public static GenericPool<FrontendService.Client> frontendVersionPool =
+ new GenericPool<>("FrontendService", heartbeatConfig,
heartbeatTimeoutMs,
+
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
public static GenericPool<FrontendService.Client> frontendPool =
new GenericPool("FrontendService", backendConfig,
Config.backend_rpc_timeout_ms,
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 3ac6eb65e0e..c2dee8d62fc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
@@ -37,7 +38,10 @@ import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
+import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
@@ -70,6 +74,8 @@ import org.apache.doris.nereids.types.DateTimeType;
import org.apache.doris.nereids.types.DateTimeV2Type;
import org.apache.doris.nereids.types.DateV2Type;
import org.apache.doris.nereids.types.coercion.DateLikeType;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
@@ -79,10 +85,13 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -94,6 +103,8 @@ import java.util.stream.Collectors;
* show [temp] partitions' detail info within a table
*/
public class PartitionsProcDir implements ProcDirInterface {
+ private static final Logger LOG =
LogManager.getLogger(PartitionsProcDir.class);
+
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
.add("PartitionId").add("PartitionName")
.add("VisibleVersion").add("VisibleVersionTime")
@@ -397,6 +408,60 @@ public class PartitionsProcDir implements ProcDirInterface
{
return partitionInfosInrernal.stream().map(pair ->
pair.second).collect(Collectors.toList());
}
+ private List<Long> getPartitionVersions(OlapTable olapTable, List<Long>
partitionIds)
+ throws AnalysisException {
+ List<Long> partitionVersions;
+ if (Config.isNotCloudMode()) {
+ partitionVersions = partitionIds.stream().map(id ->
olapTable.getPartition(id).getVisibleVersion())
+ .collect(Collectors.toList());
+ } else if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().cloudForceSyncVersion) {
+ LOG.info("cloud force sync version for table: {}, partitionNum:
{}", olapTable, partitionIds.size());
+ long dbId = olapTable.getDatabase().getId();
+ // sync table version
+ // Note: does not update table version cache to avoid that when
getting partition version fails,
+ // the table version cache is updated but partition version cache
is not updated.
+ List<Long> tableVersions =
OlapTable.getVisibleVersionFromMeta(Lists.newArrayList(dbId),
+ Lists.newArrayList(olapTable.getId()));
+ List<Pair<OlapTable, Long>> tableVersionMap =
Lists.newArrayList(Pair.of(olapTable, tableVersions.get(0)));
+ // sync partition version
+ List<CloudPartition> partitions = partitionIds.stream()
+ .map(id -> (CloudPartition)
(olapTable.getPartition(id))).collect(Collectors.toList());
+ try {
+ partitionVersions = new ArrayList<>(partitionIds.size());
+ int batchSize = Config.cloud_get_version_task_batch_size;
+ for (int start = 0; start < partitions.size(); start +=
batchSize) {
+ int end = Math.min(start + batchSize, partitions.size());
+ List<CloudPartition> batch = partitions.subList(start,
end);
+
partitionVersions.addAll(CloudPartition.getSnapshotVisibleVersionFromMs(batch,
false));
+ }
+ } catch (RpcException e) {
+ LOG.warn("get partition versions failed for table: {}",
olapTable, e);
+ throw new AnalysisException("get partition versions failed",
e);
+ }
+ Map<CloudPartition, Pair<Long, Long>> partitionVersionMap = new
HashMap<>(partitionIds.size());
+ for (int i = 0; i < partitionIds.size(); i++) {
+ CloudPartition partition = partitions.get(i);
+ long version = partitionVersions.get(i);
+ partitionVersionMap.put(partition, Pair.of(version,
partition.getVisibleVersionTime()));
+ }
+ // push to other fes
+ ((CloudEnv) (Env.getCurrentEnv())).getCloudFEVersionSynchronizer()
+ .pushVersionAsync(dbId, tableVersionMap,
partitionVersionMap);
+ } else {
+ List<CloudPartition> partitions = partitionIds.stream()
+ .map(id -> (CloudPartition)
(olapTable.getPartition(id))).collect(Collectors.toList());
+ try {
+ partitionVersions =
CloudPartition.getSnapshotVisibleVersion(partitions);
+ } catch (RpcException e) {
+ LOG.warn("get partition versions failed for table: {}",
olapTable, e);
+ throw new AnalysisException("get partition versions failed",
e);
+ }
+ }
+ Preconditions.checkState(partitionVersions.size() ==
partitionIds.size(),
+ "versions size %s not equal partition size %s",
partitionVersions.size(), partitionIds.size());
+ return partitionVersions;
+ }
+
private List<Pair<List<Comparable>, TRow>> getPartitionInfosInrernal()
throws AnalysisException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(olapTable);
@@ -446,8 +511,10 @@ public class PartitionsProcDir implements ProcDirInterface
{
partitionIds =
partitions.stream().map(Partition::getId).collect(Collectors.toList());
}
+ List<Long> partitionVersions = getPartitionVersions(olapTable,
partitionIds);
Joiner joiner = Joiner.on(", ");
- for (Long partitionId : partitionIds) {
+ for (int j = 0; j < partitionIds.size(); j++) {
+ long partitionId = partitionIds.get(j);
Partition partition = olapTable.getPartition(partitionId);
List<Comparable> partitionInfo = new ArrayList<Comparable>();
@@ -457,8 +524,9 @@ public class PartitionsProcDir implements ProcDirInterface {
trow.addToColumnValue(new TCell().setLongVal(partitionId));
partitionInfo.add(partitionName);
trow.addToColumnValue(new TCell().setStringVal(partitionName));
- partitionInfo.add(partition.getVisibleVersion());
- trow.addToColumnValue(new
TCell().setLongVal(partition.getVisibleVersion()));
+ long partitionVersion = partitionVersions.get(j);
+ partitionInfo.add(partitionVersion);
+ trow.addToColumnValue(new
TCell().setLongVal(partitionVersion));
String visibleTime =
TimeUtils.longToTimeString(partition.getVisibleVersionTime());
partitionInfo.add(visibleTime);
trow.addToColumnValue(new TCell().setStringVal(visibleTime));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 06ab6df903a..8bbd6268e20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1780,7 +1780,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
if (!isCreateTable) {
afterCreatePartitions(db.getId(), olapTable.getId(),
partitionIds, indexIds, isCreateTable,
- false /* isBatchCommit */);
+ false /* isBatchCommit */, olapTable);
}
if (writeEditLog) {
Env.getCurrentEnv().getEditLog().logAddPartition(info);
@@ -2223,6 +2223,11 @@ public class InternalCatalog implements
CatalogIf<Database> {
public void afterCreatePartitions(long dbId, long tableId, List<Long>
partitionIds, List<Long> indexIds,
boolean isCreateTable, boolean
isBatchCommit)
throws DdlException {
+ afterCreatePartitions(dbId, tableId, partitionIds, indexIds,
isCreateTable, isBatchCommit, null);
+ }
+
+ public void afterCreatePartitions(long dbId, long tableId, List<Long>
partitionIds, List<Long> indexIds,
+ boolean isCreateTable, boolean isBatchCommit, OlapTable olapTable)
throws DdlException {
}
public void checkAvailableCapacity(Database db) throws DdlException {
@@ -3004,7 +3009,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
binlogConfigForTask,
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
afterCreatePartitions(db.getId(), olapTable.getId(),
olapTable.getPartitionIds(),
- olapTable.getIndexIdList(), true /*
isCreateTable */, true /* isBatchCommit */);
+ olapTable.getIndexIdList(), true /* isCreateTable */,
true /* isBatchCommit */, olapTable);
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE
|| partitionInfo.getType() == PartitionType.LIST) {
@@ -3098,7 +3103,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
.setStoragePolicy(partionStoragePolicy);
}
afterCreatePartitions(db.getId(), olapTable.getId(),
olapTable.getPartitionIds(),
- olapTable.getIndexIdList(), true /* isCreateTable */,
true /* isBatchCommit */);
+ olapTable.getIndexIdList(), true /* isCreateTable */,
true /* isBatchCommit */, olapTable);
} else {
throw new DdlException("Unsupported partition method: " +
partitionInfo.getType().name());
}
@@ -3560,7 +3565,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
afterCreatePartitions(db.getId(), copiedTbl.getId(),
newPartitionIds, indexIds, true /* isCreateTable */,
- false /* isBatchCommit */);
+ false /* isBatchCommit */, olapTable);
} catch (DdlException e) {
// create partition failed, remove all newly created tablets
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 3fc7d3a7634..4e96a70afe7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -802,6 +802,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String CLOUD_CLUSTER = "cloud_cluster";
public static final String COMPUTE_GROUP = "compute_group";
public static final String DISABLE_EMPTY_PARTITION_PRUNE =
"disable_empty_partition_prune";
+ public static final String CLOUD_FORCE_SYNC_VERSION =
"cloud_force_sync_version";
public static final String CLOUD_PARTITION_VERSION_CACHE_TTL_MS =
"cloud_partition_version_cache_ttl_ms";
public static final String CLOUD_TABLE_VERSION_CACHE_TTL_MS =
@@ -3040,9 +3041,11 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
public boolean disableEmptyPartitionPrune = false;
@VariableMgr.VarAttr(name = CLOUD_PARTITION_VERSION_CACHE_TTL_MS)
- public long cloudPartitionVersionCacheTtlMs = 0;
+ public long cloudPartitionVersionCacheTtlMs = Long.MAX_VALUE;
@VariableMgr.VarAttr(name = CLOUD_TABLE_VERSION_CACHE_TTL_MS)
- public long cloudTableVersionCacheTtlMs = 0;
+ public long cloudTableVersionCacheTtlMs = Long.MAX_VALUE;
+ @VariableMgr.VarAttr(name = CLOUD_FORCE_SYNC_VERSION)
+ public boolean cloudForceSyncVersion = false;
// CLOUD_VARIABLES_END
// fetch remote schema rpc timeout
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 4110abe89ad..82fa9992d19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -181,6 +181,7 @@ import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.TFrontendReportAliveSessionRequest;
import org.apache.doris.thrift.TFrontendReportAliveSessionResult;
+import org.apache.doris.thrift.TFrontendSyncCloudVersionRequest;
import org.apache.doris.thrift.TGetBackendMetaRequest;
import org.apache.doris.thrift.TGetBackendMetaResult;
import org.apache.doris.thrift.TGetBinlogLagResult;
@@ -3063,6 +3064,17 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+ @Override
+ public TStatus syncCloudVersion(TFrontendSyncCloudVersionRequest request)
+ throws TException {
+ TStatus status = new TStatus(TStatusCode.OK);
+ if (Env.getCurrentEnv().isMaster()) {
+ return status;
+ }
+ ((CloudEnv)
(Env.getCurrentEnv())).getCloudFEVersionSynchronizer().syncVersionAsync(request);
+ return status;
+ }
+
@Override
public TFetchSchemaTableDataResult
fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException {
try {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
index dda4acc832c..babe7107b29 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
@@ -45,9 +45,13 @@ import org.junit.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class OlapTableTest {
@@ -405,4 +409,132 @@ public class OlapTableTest {
ConnectContext.remove();
}
}
+
+ private OlapTable createCloudOlapTable(long tableId, Database db) {
+ OlapTable table = new OlapTable() {
+ private ReadWriteLock versionLock = new ReentrantReadWriteLock();
+
+ @Override
+ public Database getDatabase() {
+ return db;
+ }
+
+ @Override
+ public void versionReadLock() {
+ versionLock.readLock().lock();
+ }
+
+ @Override
+ public void versionReadUnlock() {
+ versionLock.readLock().unlock();
+ }
+ };
+ table.id = tableId;
+ return table;
+ }
+
+ @Test
+ public void testGetVisibleVersionInBatchCached() throws Exception {
+ new MockUp<Config>() {
+ @Mock
+ public boolean isNotCloudMode() {
+ return false;
+ }
+
+ @Mock
+ public boolean isCloudMode() {
+ return true;
+ }
+ };
+
+ final Database db = new Database(1L, "test_db");
+ List<OlapTable> tables = new ArrayList<>();
+ for (long i = 0; i < 3; i++) {
+ tables.add(createCloudOlapTable(100 + i, db));
+ }
+
+ final ArrayList<ArrayList<Long>> batchVersions = new
ArrayList<>(Arrays.asList(
+ new ArrayList<>(Arrays.asList(10L, 20L, 30L)),
+ new ArrayList<>(Arrays.asList(11L, 21L, 31L)),
+ new ArrayList<>(Arrays.asList(22L, 32L)),
+ new ArrayList<>(Arrays.asList(13L, 23L, 33L))
+ ));
+ final int[] callCount = {0};
+
+ new MockUp<VersionHelper>() {
+ @Mock
+ public Cloud.GetVersionResponse
getVersionFromMeta(Cloud.GetVersionRequest req) {
+ Cloud.GetVersionResponse.Builder builder =
Cloud.GetVersionResponse.newBuilder();
+ builder.setStatus(Cloud.MetaServiceResponseStatus.newBuilder()
+ .setCode(Cloud.MetaServiceCode.OK).build());
+ builder.addAllVersions(batchVersions.get(callCount[0]));
+ callCount[0]++;
+ return builder.build();
+ }
+ };
+
+ ConnectContext ctx = new ConnectContext();
+ ctx.setSessionVariable(new SessionVariable());
+ ctx.setThreadLocalInfo();
+
+ // CHECKSTYLE OFF
+ try {
+ // Test 1: cache disabled (TTL = -1), all fetched from MS
+ ctx.getSessionVariable().cloudTableVersionCacheTtlMs = -1;
+ {
+ List<Long> versions =
OlapTable.getVisibleVersionInBatch(tables);
+ Assert.assertEquals(1, callCount[0]);
+ Assert.assertEquals(Arrays.asList(10L, 20L, 30L), versions);
+ }
+
+ // Test 2: cache enabled with long TTL, all should hit cache
+ ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 100000;
+ {
+ List<Long> versions =
OlapTable.getVisibleVersionInBatch(tables);
+ Assert.assertEquals(1, callCount[0]);
+ Assert.assertEquals(Arrays.asList(10L, 20L, 30L), versions);
+ }
+
+ // Test 3: cache disabled (TTL = 0), all fetched from MS again
+ ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 0;
+ {
+ List<Long> versions =
OlapTable.getVisibleVersionInBatch(tables);
+ Assert.assertEquals(2, callCount[0]);
+ Assert.assertEquals(Arrays.asList(11L, 21L, 31L), versions);
+ }
+
+ // Test 4: short TTL, wait for expiration, then partially refresh
+ ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 500;
+ Thread.sleep(550);
+
+ // refresh one table's cache so it stays hot
+ OlapTable hotTable = tables.get(0);
+ hotTable.setCachedTableVersion(hotTable.getCachedTableVersion());
+ Assert.assertFalse(hotTable.isCachedTableVersionExpired());
+ Assert.assertTrue(tables.get(1).isCachedTableVersionExpired());
+ Assert.assertTrue(tables.get(2).isCachedTableVersionExpired());
+ {
+ // batchVersions[2] = [22, 32] for the 2 expired tables
+ List<Long> versions =
OlapTable.getVisibleVersionInBatch(tables);
+ Assert.assertEquals(3, callCount[0]);
+ Assert.assertEquals(3, versions.size());
+ // hot table keeps its cached version
+ Assert.assertEquals(11L, versions.get(0).longValue());
+ // expired tables get new versions from MS
+ Assert.assertEquals(22L, versions.get(1).longValue());
+ Assert.assertEquals(32L, versions.get(2).longValue());
+ }
+
+ // Test 5: all expired again, full batch fetch
+ ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 0;
+ {
+ List<Long> versions =
OlapTable.getVisibleVersionInBatch(tables);
+ Assert.assertEquals(4, callCount[0]);
+ Assert.assertEquals(Arrays.asList(13L, 23L, 33L), versions);
+ }
+ } finally {
+ ConnectContext.remove();
+ }
+ // CHECKSTYLE ONca
+ }
}
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 1f61fcab17d..fd799965e37 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1012,12 +1012,13 @@ message SubTxnInfo {
message TableStatsPB {
optional int64 table_id = 1;
optional int64 updated_row_count = 2;
+ optional int64 table_version = 3;
}
message CommitTxnResponse {
optional MetaServiceResponseStatus status = 1;
optional TxnInfoPB txn_info = 2;
- // <tablet_id, partition_id> --> version
+ // <table_id, partition_id> --> version
repeated int64 table_ids = 3;
repeated int64 partition_ids = 4;
repeated int64 versions = 5;
@@ -1391,6 +1392,7 @@ message IndexRequest {
message IndexResponse {
optional MetaServiceResponseStatus status = 1;
+ optional int64 table_version = 2;
}
message PartitionRequest {
@@ -1407,6 +1409,7 @@ message PartitionRequest {
message PartitionResponse {
optional MetaServiceResponseStatus status = 1;
+ optional int64 table_version = 2;
}
message RestoreJobRequest {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 1c573c58f71..82a2d40454f 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -770,6 +770,19 @@ struct TFrontendPingFrontendRequest {
3: optional string deployMode
}
+struct TCloudVersionInfo {
+ 1: optional i64 tableId
+ 2: optional i64 partitionId
+ 3: optional i64 version
+ 4: optional i64 versionUpdateTime
+}
+
+struct TFrontendSyncCloudVersionRequest {
+ 1: optional i64 dbId
+ 2: optional list<TCloudVersionInfo> tableVersionInfos
+ 3: optional list<TCloudVersionInfo> partitionVersionInfos
+}
+
struct TFrontendReportAliveSessionRequest {
1: required i32 clusterId
2: required string token
@@ -1899,6 +1912,8 @@ service FrontendService {
TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)
+ Status.TStatus syncCloudVersion(1: TFrontendSyncCloudVersionRequest
request)
+
TInitExternalCtlMetaResult initExternalCtlMeta(1:
TInitExternalCtlMetaRequest request)
TFetchSchemaTableDataResult fetchSchemaTableData(1:
TFetchSchemaTableDataRequest request)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]