chaoyli closed pull request #504: Be refactor
URL: https://github.com/apache/incubator-doris/pull/504
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 5863e3df..3c6c0148 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -44,6 +44,8 @@
#include "olap/task/engine_clone_task.h"
#include "olap/task/engine_cancel_delete_task.h"
#include "olap/task/engine_schema_change_task.h"
+#include "olap/task/engine_batch_load_task.h"
+#include "olap/task/engine_storage_migration_task.h"
#include "olap/utils.h"
#include "common/resource_tls.h"
#include "common/status.h"
@@ -588,13 +590,8 @@ void TaskWorkerPool::_alter_tablet(
// Do not need to adjust delete success or not
// Because if delete failed create rollup will failed
if (status == DORIS_SUCCESS) {
- EngineSchemaChangeTask engine_task(alter_tablet_request, signature);
- AgentStatus ret = engine_task.execute();
- if (ret != DORIS_SUCCESS) {
- status = DORIS_ERROR;
- } else {
- status = DORIS_SUCCESS;
- }
+ EngineSchemaChangeTask engine_task(alter_tablet_request, signature,
task_type, error_msgs, process_name);
+ status = engine_task.execute();
}
if (status == DORIS_SUCCESS) {
@@ -1022,13 +1019,9 @@ void*
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback(void* arg_t
TStatusCode::type status_code = TStatusCode::OK;
vector<string> error_msgs;
TStatus task_status;
-
- OLAPStatus res = OLAPStatus::OLAP_SUCCESS;
- res = SnapshotManager::instance()->storage_medium_migrate(
- storage_medium_migrate_req.tablet_id,
- storage_medium_migrate_req.schema_hash,
- storage_medium_migrate_req.storage_medium);
- if (res != OLAPStatus::OLAP_SUCCESS) {
+ EngineStorageMigrationTask task(storage_medium_migrate_req);
+ AgentStatus res = task.execute();
+ if (res != DORIS_SUCCESS) {
OLAP_LOG_WARNING("storage media migrate failed. status: %d,
signature: %ld",
res, agent_task_req.signature);
status_code = TStatusCode::RUNTIME_ERROR;
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 1500c163..ccb804a0 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -26,6 +26,7 @@
#include <mutex>
#include <utility>
#include <vector>
+#include "agent/file_downloader.h"
#include "agent/status.h"
#include "agent/utils.h"
#include "gen_cpp/AgentService_types.h"
@@ -114,16 +115,6 @@ class TaskWorkerPool {
static void* _move_dir_thread_callback(void* arg_this);
static void* _recover_tablet_thread_callback(void* arg_this);
- AgentStatus _clone_copy(
- const TCloneReq& clone_req,
- int64_t signature,
- const std::string& local_data_path,
- TBackend* src_host,
- std::string* src_file_path,
- std::vector<std::string>* error_msgs,
- const std::vector<Version>* missing_versions,
- bool* allow_incremental_clone);
-
void _alter_tablet(
const TAlterTabletReq& create_rollup_request,
int64_t signature,
@@ -154,7 +145,6 @@ class TaskWorkerPool {
#ifdef BE_TEST
AgentServerClient* _agent_client;
FileDownloader* _file_downloader_ptr;
- Pusher * _pusher;
#endif
std::deque<TAgentTaskRequest> _tasks;
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 7522aee0..f3a7204d 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -74,6 +74,7 @@ add_library(Olap STATIC
wrapper_field.cpp
task/engine_schema_change_task.cpp
task/engine_batch_load_task.cpp
+ task/engine_storage_migration_task.cpp
)
add_subdirectory(rowset)
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index b815fc36..bd284c50 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -140,7 +140,7 @@ OLAPStatus SnapshotManager::_calc_snapshot_id_path(
return res;
}
-string SnapshotManager::_get_schema_hash_full_path(
+string SnapshotManager::get_schema_hash_full_path(
const TabletSharedPtr& ref_tablet,
const string& location) const {
stringstream schema_full_path_stream;
@@ -160,7 +160,7 @@ string SnapshotManager::_get_header_full_path(
return header_name_stream.str();
}
-void SnapshotManager::_update_header_file_info(
+void SnapshotManager::update_header_file_info(
const vector<VersionEntity>& shortest_versions,
TabletMeta* header) {
// clear schema_change_status
@@ -230,49 +230,6 @@ OLAPStatus SnapshotManager::_link_index_and_data_files(
return res;
}
-OLAPStatus SnapshotManager::_copy_index_and_data_files(
- const string& schema_hash_path,
- const TabletSharedPtr& ref_tablet,
- vector<VersionEntity>& version_entity_vec) {
- std::stringstream prefix_stream;
- prefix_stream << schema_hash_path << "/" << ref_tablet->tablet_id();
- std::string tablet_path_prefix = prefix_stream.str();
- for (VersionEntity& entity : version_entity_vec) {
- Version version = entity.version;
- VersionHash v_hash = entity.version_hash;
- for (SegmentGroupEntity segment_group_entity :
entity.segment_group_vec) {
- int32_t segment_group_id = segment_group_entity.segment_group_id;
- for (int seg_id = 0; seg_id < segment_group_entity.num_segments;
++seg_id) {
- string index_path =
- _construct_index_file_path(tablet_path_prefix, version,
v_hash, segment_group_id, seg_id);
- string ref_tablet_index_path =
ref_tablet->construct_index_file_path(
- version, v_hash, segment_group_id, seg_id);
- Status res = FileUtils::copy_file(ref_tablet_index_path,
index_path);
- if (!res.ok()) {
- LOG(WARNING) << "fail to copy index file."
- << "dest=" << index_path
- << ", src=" << ref_tablet_index_path;
- return OLAP_ERR_COPY_FILE_ERROR;
- }
-
- string data_path =
- _construct_data_file_path(tablet_path_prefix, version,
v_hash, segment_group_id, seg_id);
- string ref_tablet_data_path =
ref_tablet->construct_data_file_path(
- version, v_hash, segment_group_id, seg_id);
- res = FileUtils::copy_file(ref_tablet_data_path, data_path);
- if (!res.ok()) {
- LOG(WARNING) << "fail to copy data file."
- << "dest=" << index_path
- << ", src=" << ref_tablet_index_path;
- return OLAP_ERR_COPY_FILE_ERROR;
- }
- }
- }
- }
-
- return OLAP_SUCCESS;
-}
-
OLAPStatus SnapshotManager::_create_snapshot_files(
const TabletSharedPtr& ref_tablet,
const TSnapshotRequest& request,
@@ -291,7 +248,7 @@ OLAPStatus SnapshotManager::_create_snapshot_files(
return res;
}
- string schema_full_path = _get_schema_hash_full_path(
+ string schema_full_path = get_schema_hash_full_path(
ref_tablet, snapshot_id_path);
string header_path = _get_header_full_path(ref_tablet, schema_full_path);
if (check_dir_existed(schema_full_path)) {
@@ -378,7 +335,7 @@ OLAPStatus SnapshotManager::_create_snapshot_files(
ref_tablet->release_header_lock();
header_locked = false;
- _update_header_file_info(shortest_versions, new_tablet_meta);
+ update_header_file_info(shortest_versions, new_tablet_meta);
// save new header to snapshot header path
res = new_tablet_meta->save(header_path);
@@ -467,7 +424,7 @@ OLAPStatus
SnapshotManager::_create_incremental_snapshot_files(
return res;
}
- string schema_full_path = _get_schema_hash_full_path(ref_tablet,
snapshot_id_path);
+ string schema_full_path = get_schema_hash_full_path(ref_tablet,
snapshot_id_path);
if (check_dir_existed(schema_full_path)) {
VLOG(10) << "remove the old schema_full_path.";
remove_all_dir(schema_full_path);
@@ -612,7 +569,7 @@ OLAPStatus SnapshotManager::_append_single_delta(
return res;
}
-string SnapshotManager::_construct_index_file_path(
+string SnapshotManager::construct_index_file_path(
const string& tablet_path_prefix,
const Version& version,
VersionHash version_hash,
@@ -620,7 +577,7 @@ string SnapshotManager::_construct_index_file_path(
return Tablet::construct_file_path(tablet_path_prefix, version,
version_hash, segment_group_id, segment, "idx");
}
-string SnapshotManager::_construct_data_file_path(
+string SnapshotManager::construct_data_file_path(
const string& tablet_path_prefix,
const Version& version,
VersionHash version_hash,
@@ -639,179 +596,5 @@ OLAPStatus SnapshotManager::_create_hard_link(const
string& from_path, const str
return OLAP_ERR_OTHER_ERROR;
}
}
-
-OLAPStatus SnapshotManager::storage_medium_migrate(
- TTabletId tablet_id, TSchemaHash schema_hash,
- TStorageMedium::type storage_medium) {
- LOG(INFO) << "begin to process storage media migrate. "
- << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
- << ", dest_storage_medium=" << storage_medium;
- DorisMetrics::storage_migrate_requests_total.increment(1);
-
- OLAPStatus res = OLAP_SUCCESS;
- TabletSharedPtr tablet = TabletManager::instance()->get_tablet(tablet_id,
schema_hash);
- if (tablet.get() == NULL) {
- OLAP_LOG_WARNING("can't find tablet. [tablet_id=%ld schema_hash=%d]",
- tablet_id, schema_hash);
- return OLAP_ERR_TABLE_NOT_FOUND;
- }
-
- // judge case when no need to migrate
- uint32_t count =
StorageEngine::get_instance()->available_storage_medium_type_count();
- if (count <= 1) {
- LOG(INFO) << "available storage medium type count is less than 1, "
- << "no need to migrate. count=" << count;
- return OLAP_SUCCESS;
- }
-
- TStorageMedium::type src_storage_medium =
tablet->data_dir()->storage_medium();
- if (src_storage_medium == storage_medium) {
- LOG(INFO) << "tablet is already on specified storage medium. "
- << "storage_medium=" << storage_medium;
- return OLAP_SUCCESS;
- }
-
- vector<ColumnData*> olap_data_sources;
- tablet->obtain_push_lock();
-
- do {
- // get all versions to be migrate
- tablet->obtain_header_rdlock();
- const PDelta* lastest_version = tablet->lastest_version();
- if (lastest_version == NULL) {
- tablet->release_header_lock();
- res = OLAP_ERR_VERSION_NOT_EXIST;
- OLAP_LOG_WARNING("tablet has not any version.");
- break;
- }
-
- int32_t end_version = lastest_version->end_version();
- tablet->acquire_data_sources(Version(0, end_version),
&olap_data_sources);
- if (olap_data_sources.size() == 0) {
- tablet->release_header_lock();
- res = OLAP_ERR_VERSION_NOT_EXIST;
- OLAP_LOG_WARNING("fail to acquire data souces. [tablet='%s'
version=%d]",
- tablet->full_name().c_str(), end_version);
- break;
- }
-
- vector<VersionEntity> version_entity_vec;
- tablet->list_version_entities(&version_entity_vec);
- tablet->release_header_lock();
-
- // generate schema hash path where files will be migrated
- auto stores =
StorageEngine::get_instance()->get_stores_for_create_tablet(storage_medium);
- if (stores.empty()) {
- res = OLAP_ERR_INVALID_ROOT_PATH;
- OLAP_LOG_WARNING("fail to get root path for create tablet.");
- break;
- }
-
- uint64_t shard = 0;
- res = stores[0]->get_shard(&shard);
- if (res != OLAP_SUCCESS) {
- OLAP_LOG_WARNING("fail to get root path shard. [res=%d]", res);
- break;
- }
-
- stringstream root_path_stream;
- root_path_stream << stores[0]->path() << DATA_PREFIX << "/" << shard;
- string schema_hash_path = _get_schema_hash_full_path(tablet,
root_path_stream.str());
- if (check_dir_existed(schema_hash_path)) {
- VLOG(3) << "schema hash path already exist, remove it. "
- << "schema_hash_path=" << schema_hash_path;
- remove_all_dir(schema_hash_path);
- }
- create_dirs(schema_hash_path);
-
- // migrate all index and data files but header file
- res = _copy_index_and_data_files(schema_hash_path, tablet,
version_entity_vec);
- if (res != OLAP_SUCCESS) {
- OLAP_LOG_WARNING("fail to copy index and data files when migrate.
[res=%d]", res);
- break;
- }
-
- // generate new header file from the old
- TabletMeta* new_tablet_meta = new(std::nothrow) TabletMeta();
- if (new_tablet_meta == NULL) {
- OLAP_LOG_WARNING("new olap header failed");
- return OLAP_ERR_BUFFER_OVERFLOW;
- }
- res = _generate_new_header(stores[0], shard, tablet,
version_entity_vec, new_tablet_meta);
- if (res != OLAP_SUCCESS) {
- OLAP_LOG_WARNING("fail to generate new header file from the old.
[res=%d]", res);
- break;
- }
-
- // load the new tablet into OLAPEngine
- auto tablet = Tablet::create_from_header(new_tablet_meta, stores[0]);
- if (tablet == NULL) {
- OLAP_LOG_WARNING("failed to create from header");
- res = OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
- break;
- }
- res = TabletManager::instance()->add_tablet(tablet_id, schema_hash,
tablet, false);
- if (res != OLAP_SUCCESS) {
- OLAP_LOG_WARNING("fail to add tablet to StorageEngine. [res=%d]",
res);
- break;
- }
-
- // if old tablet finished schema change, then the schema change status
of the new tablet is DONE
- // else the schema change status of the new tablet is FAILED
- TabletSharedPtr new_tablet =
TabletManager::instance()->get_tablet(tablet_id, schema_hash);
- if (new_tablet.get() == NULL) {
- OLAP_LOG_WARNING("get null tablet. [tablet_id=%ld schema_hash=%d]",
- tablet_id, schema_hash);
- return OLAP_ERR_TABLE_NOT_FOUND;
- }
- SchemaChangeStatus tablet_status = tablet->schema_change_status();
- if (tablet->schema_change_status().status ==
AlterTableStatus::ALTER_TABLE_FINISHED) {
- new_tablet->set_schema_change_status(tablet_status.status,
- tablet_status.schema_hash,
- tablet_status.version);
- } else {
-
new_tablet->set_schema_change_status(AlterTableStatus::ALTER_TABLE_FAILED,
- tablet_status.schema_hash,
- tablet_status.version);
- }
- } while (0);
-
- tablet->release_push_lock();
- tablet->release_data_sources(&olap_data_sources);
-
- return res;
-}
-
-OLAPStatus SnapshotManager::_generate_new_header(
- DataDir* store,
- const uint64_t new_shard,
- const TabletSharedPtr& tablet,
- const vector<VersionEntity>& version_entity_vec, TabletMeta*
new_tablet_meta) {
- if (store == nullptr) {
- LOG(WARNING) << "fail to generate new header for store is null";
- return OLAP_ERR_HEADER_INIT_FAILED;
- }
- OLAPStatus res = OLAP_SUCCESS;
-
- DataDir* ref_store =
-
StorageEngine::get_instance()->get_store(tablet->storage_root_path_name());
- TabletMetaManager::get_header(ref_store, tablet->tablet_id(),
tablet->schema_hash(), new_tablet_meta);
- _update_header_file_info(version_entity_vec, new_tablet_meta);
- new_tablet_meta->set_shard(new_shard);
-
- res = TabletMetaManager::save(store, tablet->tablet_id(),
tablet->schema_hash(), new_tablet_meta);
- if (res != OLAP_SUCCESS) {
- OLAP_LOG_WARNING("fail to save olap header to new db. [res=%d]", res);
- return res;
- }
-
- // delete old header
- // TODO: make sure atomic update
- TabletMetaManager::remove(ref_store, tablet->tablet_id(),
tablet->schema_hash());
- if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "fail to delete olap header to old db. res=" << res;
- }
- return res;
-}
-
+
} // namespace doris
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index a4c16249..008c2c87 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -54,14 +54,37 @@ class SnapshotManager {
const TSnapshotRequest& request,
std::string* snapshot_path);
+
+ // TODO(ygl) move it to a utility class
+ void update_header_file_info(
+ const std::vector<VersionEntity>& shortest_version_entity,
+ TabletMeta* header);
+
+ // TODO(ygl) move it to a utility class
+ // TODO: hkp
+ // rewrite this function
+ std::string construct_index_file_path(
+ const std::string& tablet_path_prefix,
+ const Version& version,
+ VersionHash version_hash,
+ int32_t segment_group_id, int32_t segment) const;
+
+ // TODO(ygl) move it to a utility class
+ // TODO: hkp
+ // rewrite this function
+ std::string construct_data_file_path(
+ const std::string& tablet_path_prefix,
+ const Version& version,
+ VersionHash version_hash,
+ int32_t segment_group_id, int32_t segment) const;
+
+ std::string get_schema_hash_full_path(
+ const TabletSharedPtr& ref_tablet,
+ const std::string& location) const;
+
// @brief 释放snapshot
// @param snapshot_path [in] 要被释放的snapshot的路径,只包含到ID
OLAPStatus release_snapshot(const std::string& snapshot_path);
-
- OLAPStatus storage_medium_migrate(
- TTabletId tablet_id,
- TSchemaHash schema_hash,
- TStorageMedium::type storage_medium);
static SnapshotManager* instance();
@@ -73,18 +96,10 @@ class SnapshotManager {
const TabletSharedPtr& tablet,
std::string* out_path);
- std::string _get_schema_hash_full_path(
- const TabletSharedPtr& ref_tablet,
- const std::string& location) const;
-
std::string _get_header_full_path(
const TabletSharedPtr& ref_tablet,
const std::string& schema_hash_path) const;
- void _update_header_file_info(
- const std::vector<VersionEntity>& shortest_version_entity,
- TabletMeta* header);
-
// TODO: hkp
// rewrite this function
OLAPStatus _link_index_and_data_files(
@@ -92,13 +107,6 @@ class SnapshotManager {
const TabletSharedPtr& ref_tablet,
const std::vector<VersionEntity>& version_entity_vec);
- // TODO: hkp
- // rewrite this function
- OLAPStatus _copy_index_and_data_files(
- const std::string& header_path,
- const TabletSharedPtr& ref_tablet,
- std::vector<VersionEntity>& version_entity_vec);
-
OLAPStatus _create_snapshot_files(
const TabletSharedPtr& ref_tablet,
const TSnapshotRequest& request,
@@ -116,28 +124,6 @@ class SnapshotManager {
const TSnapshotRequest& request,
DataDir* store);
- // TODO: hkp
- // rewrite this function
- std::string _construct_index_file_path(
- const std::string& tablet_path_prefix,
- const Version& version,
- VersionHash version_hash,
- int32_t segment_group_id, int32_t segment) const;
-
- // TODO: hkp
- // rewrite this function
- std::string _construct_data_file_path(
- const std::string& tablet_path_prefix,
- const Version& version,
- VersionHash version_hash,
- int32_t segment_group_id, int32_t segment) const;
-
- OLAPStatus _generate_new_header(
- DataDir* store,
- const uint64_t new_shard,
- const TabletSharedPtr& tablet,
- const std::vector<VersionEntity>& version_entity_vec, TabletMeta*
new_tablet_meta);
-
OLAPStatus _create_hard_link(const std::string& from_path, const
std::string& to_path);
private:
diff --git a/be/src/olap/task/engine_batch_load_task.cpp
b/be/src/olap/task/engine_batch_load_task.cpp
index 1818b0e1..dc489228 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -44,7 +44,7 @@ using std::vector;
namespace doris {
-EngineBatchLoadTask::EngineBatchLoadTask(const TPushReq& push_req,
+EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req,
std::vector<TTabletInfo>* tablet_infos,
int64_t signature) :
_push_req(push_req), _tablet_infos(tablet_infos),
_signature(signature) {
@@ -89,6 +89,7 @@ AgentStatus EngineBatchLoadTask::execute() {
} else {
status = DORIS_TASK_REQUEST_ERROR;
}
+ return status;
}
AgentStatus EngineBatchLoadTask::_init() {
diff --git a/be/src/olap/task/engine_batch_load_task.h
b/be/src/olap/task/engine_batch_load_task.h
index 2e84ac42..6fc231d2 100644
--- a/be/src/olap/task/engine_batch_load_task.h
+++ b/be/src/olap/task/engine_batch_load_task.h
@@ -39,7 +39,7 @@ class StorageEngine;
class EngineBatchLoadTask : public EngineTask{
public:
- EngineBatchLoadTask(const TPushReq& push_req, std::vector<TTabletInfo>*
tablet_infos, int64_t signature);
+ EngineBatchLoadTask(TPushReq& push_req, std::vector<TTabletInfo>*
tablet_infos, int64_t signature);
virtual ~EngineBatchLoadTask();
virtual AgentStatus execute();
@@ -72,7 +72,7 @@ class EngineBatchLoadTask : public EngineTask{
void _get_file_name_from_path(const std::string& file_path, std::string*
file_name);
bool _is_init = false;
- const TPushReq& _push_req;
+ TPushReq& _push_req;
std::vector<TTabletInfo>* _tablet_infos;
FileDownloader::FileDownloaderParam _downloader_param;
FileDownloader* _file_downloader;
diff --git a/be/src/olap/task/engine_schema_change_task.cpp
b/be/src/olap/task/engine_schema_change_task.cpp
index ebfc5034..fa4a6406 100644
--- a/be/src/olap/task/engine_schema_change_task.cpp
+++ b/be/src/olap/task/engine_schema_change_task.cpp
@@ -21,10 +21,16 @@
namespace doris {
+using std::to_string;
+
EngineSchemaChangeTask::EngineSchemaChangeTask(const TAlterTabletReq&
alter_tablet_request,
- int64_t signature):
+ int64_t signature, const TTaskType::type task_type, vector<string>&
error_msgs,
+ string& process_name):
_alter_tablet_req(alter_tablet_request),
- _signature(signature) {
+ _signature(signature),
+ _task_type(task_type),
+ _error_msgs(error_msgs),
+ _process_name(process_name) {
}
@@ -55,7 +61,7 @@ AgentStatus EngineSchemaChangeTask::execute() {
OLAP_LOG_WARNING("delete failed rollup file failed, status: %d, "
"signature: %ld.",
status, _signature);
- error_msgs.push_back("delete failed rollup file failed, "
+ _error_msgs.push_back("delete failed rollup file failed, "
"signature: " + to_string(_signature));
}
}
@@ -66,7 +72,7 @@ AgentStatus EngineSchemaChangeTask::execute() {
|| alter_tablet_status == ALTER_TABLE_FAILED
|| alter_tablet_status == ALTER_TABLE_WAITING) {
// Create rollup table
- switch (task_type) {
+ switch (_task_type) {
case TTaskType::ROLLUP:
status = _create_rollup_tablet(_alter_tablet_req);
break;
@@ -78,7 +84,7 @@ AgentStatus EngineSchemaChangeTask::execute() {
break;
}
if (status != OLAPStatus::OLAP_SUCCESS) {
- LOG(WARNING) << process_name << " failed. signature: " <<
_signature << " status: " << status;
+ LOG(WARNING) << _process_name << " failed. signature: " <<
_signature << " status: " << status;
}
}
}
diff --git a/be/src/olap/task/engine_schema_change_task.h
b/be/src/olap/task/engine_schema_change_task.h
index 5a30ef1f..9d827a4c 100644
--- a/be/src/olap/task/engine_schema_change_task.h
+++ b/be/src/olap/task/engine_schema_change_task.h
@@ -32,7 +32,8 @@ class EngineSchemaChangeTask : public EngineTask {
virtual AgentStatus execute();
public:
- EngineSchemaChangeTask(const TAlterTabletReq& alter_tablet_request,
int64_t signature);
+ EngineSchemaChangeTask(const TAlterTabletReq& alter_tablet_request,
int64_t signature,
+ const TTaskType::type task_type, vector<string>& error_msgs, string&
process_name);
~EngineSchemaChangeTask() {}
private:
@@ -60,6 +61,9 @@ class EngineSchemaChangeTask : public EngineTask {
private:
const TAlterTabletReq& _alter_tablet_req;
int64_t _signature;
+ const TTaskType::type _task_type;
+ vector<string>& _error_msgs;
+ string& _process_name;
}; // EngineTask
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
new file mode 100644
index 00000000..7cc095cd
--- /dev/null
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/task/engine_storage_migration_task.h"
+
+#include "olap/snapshot_manager.h"
+#include "olap/tablet_meta_manager.h"
+
+namespace doris {
+
+using std::stringstream;
+
+EngineStorageMigrationTask::EngineStorageMigrationTask(TStorageMediumMigrateReq&
storage_medium_migrate_req) :
+ _storage_medium_migrate_req(storage_medium_migrate_req) {
+
+}
+
+AgentStatus EngineStorageMigrationTask::execute() {
+ OLAPStatus res = OLAP_SUCCESS;
+ res = _storage_medium_migrate(
+ _storage_medium_migrate_req.tablet_id,
+ _storage_medium_migrate_req.schema_hash,
+ _storage_medium_migrate_req.storage_medium);
+ if (res != OLAP_SUCCESS) {
+ return DORIS_ERROR;
+ } else {
+ return DORIS_SUCCESS;
+ }
+}
+
+OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate(
+ TTabletId tablet_id, TSchemaHash schema_hash,
+ TStorageMedium::type storage_medium) {
+ LOG(INFO) << "begin to process storage media migrate. "
+ << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
+ << ", dest_storage_medium=" << storage_medium;
+ DorisMetrics::storage_migrate_requests_total.increment(1);
+
+ OLAPStatus res = OLAP_SUCCESS;
+ TabletSharedPtr tablet = TabletManager::instance()->get_tablet(tablet_id,
schema_hash);
+ if (tablet.get() == NULL) {
+ OLAP_LOG_WARNING("can't find tablet. [tablet_id=%ld schema_hash=%d]",
+ tablet_id, schema_hash);
+ return OLAP_ERR_TABLE_NOT_FOUND;
+ }
+
+ // judge case when no need to migrate
+ uint32_t count =
StorageEngine::get_instance()->available_storage_medium_type_count();
+ if (count <= 1) {
+ LOG(INFO) << "available storage medium type count is less than 1, "
+ << "no need to migrate. count=" << count;
+ return OLAP_SUCCESS;
+ }
+
+ TStorageMedium::type src_storage_medium =
tablet->data_dir()->storage_medium();
+ if (src_storage_medium == storage_medium) {
+ LOG(INFO) << "tablet is already on specified storage medium. "
+ << "storage_medium=" << storage_medium;
+ return OLAP_SUCCESS;
+ }
+
+ vector<ColumnData*> olap_data_sources;
+ tablet->obtain_push_lock();
+
+ do {
+ // get all versions to be migrate
+ tablet->obtain_header_rdlock();
+ const PDelta* lastest_version = tablet->lastest_version();
+ if (lastest_version == NULL) {
+ tablet->release_header_lock();
+ res = OLAP_ERR_VERSION_NOT_EXIST;
+ OLAP_LOG_WARNING("tablet has not any version.");
+ break;
+ }
+
+ int32_t end_version = lastest_version->end_version();
+ tablet->acquire_data_sources(Version(0, end_version),
&olap_data_sources);
+ if (olap_data_sources.size() == 0) {
+ tablet->release_header_lock();
+ res = OLAP_ERR_VERSION_NOT_EXIST;
+ OLAP_LOG_WARNING("fail to acquire data souces. [tablet='%s'
version=%d]",
+ tablet->full_name().c_str(), end_version);
+ break;
+ }
+
+ vector<VersionEntity> version_entity_vec;
+ tablet->list_version_entities(&version_entity_vec);
+ tablet->release_header_lock();
+
+ // generate schema hash path where files will be migrated
+ auto stores =
StorageEngine::get_instance()->get_stores_for_create_tablet(storage_medium);
+ if (stores.empty()) {
+ res = OLAP_ERR_INVALID_ROOT_PATH;
+ OLAP_LOG_WARNING("fail to get root path for create tablet.");
+ break;
+ }
+
+ uint64_t shard = 0;
+ res = stores[0]->get_shard(&shard);
+ if (res != OLAP_SUCCESS) {
+ OLAP_LOG_WARNING("fail to get root path shard. [res=%d]", res);
+ break;
+ }
+
+ stringstream root_path_stream;
+ root_path_stream << stores[0]->path() << DATA_PREFIX << "/" << shard;
+ string schema_hash_path =
SnapshotManager::instance()->get_schema_hash_full_path(tablet,
root_path_stream.str());
+ if (check_dir_existed(schema_hash_path)) {
+ VLOG(3) << "schema hash path already exist, remove it. "
+ << "schema_hash_path=" << schema_hash_path;
+ remove_all_dir(schema_hash_path);
+ }
+ create_dirs(schema_hash_path);
+
+ // migrate all index and data files but header file
+ res = _copy_index_and_data_files(schema_hash_path, tablet,
version_entity_vec);
+ if (res != OLAP_SUCCESS) {
+ OLAP_LOG_WARNING("fail to copy index and data files when migrate.
[res=%d]", res);
+ break;
+ }
+
+ // generate new header file from the old
+ TabletMeta* new_tablet_meta = new(std::nothrow) TabletMeta();
+ if (new_tablet_meta == NULL) {
+ OLAP_LOG_WARNING("new olap header failed");
+ return OLAP_ERR_BUFFER_OVERFLOW;
+ }
+ res = _generate_new_header(stores[0], shard, tablet,
version_entity_vec, new_tablet_meta);
+ if (res != OLAP_SUCCESS) {
+ OLAP_LOG_WARNING("fail to generate new header file from the old.
[res=%d]", res);
+ break;
+ }
+
+ // load the new tablet into OLAPEngine
+ auto tablet = Tablet::create_from_header(new_tablet_meta, stores[0]);
+ if (tablet == NULL) {
+ OLAP_LOG_WARNING("failed to create from header");
+ res = OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
+ break;
+ }
+ res = TabletManager::instance()->add_tablet(tablet_id, schema_hash,
tablet, false);
+ if (res != OLAP_SUCCESS) {
+ OLAP_LOG_WARNING("fail to add tablet to StorageEngine. [res=%d]",
res);
+ break;
+ }
+
+ // if old tablet finished schema change, then the schema change status
of the new tablet is DONE
+ // else the schema change status of the new tablet is FAILED
+ TabletSharedPtr new_tablet =
TabletManager::instance()->get_tablet(tablet_id, schema_hash);
+ if (new_tablet.get() == NULL) {
+ OLAP_LOG_WARNING("get null tablet. [tablet_id=%ld schema_hash=%d]",
+ tablet_id, schema_hash);
+ return OLAP_ERR_TABLE_NOT_FOUND;
+ }
+ SchemaChangeStatus tablet_status = tablet->schema_change_status();
+ if (tablet->schema_change_status().status ==
AlterTableStatus::ALTER_TABLE_FINISHED) {
+ new_tablet->set_schema_change_status(tablet_status.status,
+ tablet_status.schema_hash,
+ tablet_status.version);
+ } else {
+
new_tablet->set_schema_change_status(AlterTableStatus::ALTER_TABLE_FAILED,
+ tablet_status.schema_hash,
+ tablet_status.version);
+ }
+ } while (0);
+
+ tablet->release_push_lock();
+ tablet->release_data_sources(&olap_data_sources);
+
+ return res;
+}
+
+
+OLAPStatus EngineStorageMigrationTask::_generate_new_header(
+ DataDir* store,
+ const uint64_t new_shard,
+ const TabletSharedPtr& tablet,
+ const vector<VersionEntity>& version_entity_vec, TabletMeta*
new_tablet_meta) {
+ if (store == nullptr) {
+ LOG(WARNING) << "fail to generate new header for store is null";
+ return OLAP_ERR_HEADER_INIT_FAILED;
+ }
+ OLAPStatus res = OLAP_SUCCESS;
+
+ DataDir* ref_store =
+
StorageEngine::get_instance()->get_store(tablet->storage_root_path_name());
+ TabletMetaManager::get_header(ref_store, tablet->tablet_id(),
tablet->schema_hash(), new_tablet_meta);
+ SnapshotManager::instance()->update_header_file_info(version_entity_vec,
new_tablet_meta);
+ new_tablet_meta->set_shard(new_shard);
+
+ res = TabletMetaManager::save(store, tablet->tablet_id(),
tablet->schema_hash(), new_tablet_meta);
+ if (res != OLAP_SUCCESS) {
+ OLAP_LOG_WARNING("fail to save olap header to new db. [res=%d]", res);
+ return res;
+ }
+
+ // delete old header
+ // TODO: make sure atomic update
+ TabletMetaManager::remove(ref_store, tablet->tablet_id(),
tablet->schema_hash());
+ if (res != OLAP_SUCCESS) {
+ LOG(WARNING) << "fail to delete olap header to old db. res=" << res;
+ }
+ return res;
+}
+
+OLAPStatus EngineStorageMigrationTask::_copy_index_and_data_files(
+ const string& schema_hash_path,
+ const TabletSharedPtr& ref_tablet,
+ vector<VersionEntity>& version_entity_vec) {
+ std::stringstream prefix_stream;
+ prefix_stream << schema_hash_path << "/" << ref_tablet->tablet_id();
+ std::string tablet_path_prefix = prefix_stream.str();
+ for (VersionEntity& entity : version_entity_vec) {
+ Version version = entity.version;
+ VersionHash v_hash = entity.version_hash;
+ for (SegmentGroupEntity segment_group_entity :
entity.segment_group_vec) {
+ int32_t segment_group_id = segment_group_entity.segment_group_id;
+ for (int seg_id = 0; seg_id < segment_group_entity.num_segments;
++seg_id) {
+ string index_path =
+
SnapshotManager::instance()->construct_index_file_path(tablet_path_prefix,
version, v_hash, segment_group_id, seg_id);
+ string ref_tablet_index_path =
ref_tablet->construct_index_file_path(
+ version, v_hash, segment_group_id, seg_id);
+ Status res = FileUtils::copy_file(ref_tablet_index_path,
index_path);
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to copy index file."
+ << "dest=" << index_path
+ << ", src=" << ref_tablet_index_path;
+ return OLAP_ERR_COPY_FILE_ERROR;
+ }
+
+ string data_path =
+
SnapshotManager::instance()->construct_data_file_path(tablet_path_prefix,
version, v_hash, segment_group_id, seg_id);
+ string ref_tablet_data_path =
ref_tablet->construct_data_file_path(
+ version, v_hash, segment_group_id, seg_id);
+ res = FileUtils::copy_file(ref_tablet_data_path, data_path);
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to copy data file."
+ << "dest=" << index_path
+ << ", src=" << ref_tablet_index_path;
+ return OLAP_ERR_COPY_FILE_ERROR;
+ }
+ }
+ }
+ }
+
+ return OLAP_SUCCESS;
+}
+
+} // doris
\ No newline at end of file
diff --git a/be/src/olap/task/engine_storage_migration_task.h
b/be/src/olap/task/engine_storage_migration_task.h
new file mode 100644
index 00000000..249113aa
--- /dev/null
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_H
+#define DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_H
+
+#include "gen_cpp/AgentService_types.h"
+#include "olap/olap_define.h"
+#include "olap/task/engine_task.h"
+
+namespace doris {
+
+// base class for storage engine
+// add "Engine" as task prefix to prevent duplicate name with agent task
+class EngineStorageMigrationTask : public EngineTask {
+
+public:
+ virtual AgentStatus execute();
+
+public:
+ EngineStorageMigrationTask(TStorageMediumMigrateReq&
storage_medium_migrate_req);
+ ~EngineStorageMigrationTask() {}
+
+private:
+ OLAPStatus _storage_medium_migrate(
+ TTabletId tablet_id, TSchemaHash schema_hash,
+ TStorageMedium::type storage_medium);
+
+ OLAPStatus _generate_new_header(
+ DataDir* store,
+ const uint64_t new_shard,
+ const TabletSharedPtr& tablet,
+ const std::vector<VersionEntity>& version_entity_vec, TabletMeta*
new_tablet_meta);
+
+ // TODO: hkp
+ // rewrite this function
+ OLAPStatus _copy_index_and_data_files(
+ const std::string& header_path,
+ const TabletSharedPtr& ref_tablet,
+ std::vector<VersionEntity>& version_entity_vec);
+
+private:
+ const TStorageMediumMigrateReq& _storage_medium_migrate_req;
+}; // EngineTask
+
+} // doris
+#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_H
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]