spaces-X commented on code in PR #15839:
URL: https://github.com/apache/doris/pull/15839#discussion_r1067656119


##########
be/src/tools/builder_helper.cpp:
##########
@@ -0,0 +1,295 @@
+#include "tools/builder_helper.h"
+
+#include <cstdlib>
+#include <filesystem>
+#include <fstream>
+#include <iostream>
+#include <set>
+#include <sstream>
+#include <string>
+
+#include "common/object_pool.h"
+#include "common/status.h"
+#include "env/env.h"
+#include "exec/parquet_scanner.h"
+#include "exprs/cast_functions.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/olap_file.pb.h"
+#include "gen_cpp/segment_v2.pb.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/split.h"
+#include "gutil/strings/substitute.h"
+#include "io/buffered_reader.h"
+#include "io/file_factory.h"
+#include "io/file_reader.h"
+#include "io/local_file_reader.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/data_dir.h"
+#include "olap/file_helper.h"
+#include "olap/olap_define.h"
+#include "olap/options.h"
+#include "olap/row.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset_id_generator.h"
+#include "olap/rowset/rowset_meta_manager.h"
+#include "olap/rowset/segment_v2/binary_plain_page.h"
+#include "olap/rowset/segment_v2/column_reader.h"
+#include "olap/schema_change.h"
+#include "olap/storage_engine.h"
+#include "olap/storage_policy_mgr.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_meta_manager.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+#include "olap/utils.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "runtime/user_function_cache.h"
+#include "tools/builder_scanner.h"
+#include "tools/builder_scanner_memtable.h"
+#include "util/coding.h"
+#include "util/crc32c.h"
+#include "util/disk_info.h"
+#include "util/file_utils.h"
+#include "util/runtime_profile.h"
+#include "util/time.h"
+#include "vec/exec/format/parquet/vparquet_file_metadata.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris {
+#define BUFFER_SIZE 1048576
+
+BuilderHelper* BuilderHelper::_s_instance = nullptr;
+
+BuilderHelper* BuilderHelper::init_instance() {
+    // DCHECK(_s_instance == nullptr);
+    static BuilderHelper instance;
+    _s_instance = &instance;
+    return _s_instance;
+}
+
+void BuilderHelper::initial_build_env() {
+    char doris_home[] = "DORIS_HOME=/tmp";
+    putenv(doris_home);
+
+    if (!doris::config::init(nullptr, true, false, true)) {
+        LOG(FATAL) << "init config fail";
+        exit(-1);
+    }
+    CpuInfo::init();
+    DiskInfo::init();
+    MemInfo::init();
+
+    // write buffer size before flush
+    config::write_buffer_size = 14097152000;
+    // max buffer size used in memtable for the aggregated table
+    config::memtable_max_buffer_size = 18194304000;
+    doris::thread_context()->thread_mem_tracker_mgr->set_check_limit(false);
+    doris::TabletSchemaCache::create_global_schema_cache();
+    doris::ChunkAllocator::init_instance(4096);
+
+    doris::MemInfo::init();
+}
+
+void BuilderHelper::open(const std::string& meta_file, const std::string& 
build_dir,
+                         const std::string& data_path, const std::string& 
file_type, bool isHDFS) {
+    _meta_file = meta_file;
+    _build_dir = build_dir;
+    if (data_path.at(data_path.size() - 1) != '/')
+        _data_path = data_path + "/";
+    else
+        _data_path = data_path;
+
+    _file_type = file_type;
+    _isHDFS = isHDFS;
+
+    // TODO: need adapt for open HDFS
+    if (_isHDFS) {
+        THdfsParams hdfs_params;
+        hdfsFileSystem = std::make_unique<io::HdfsFileSystem>(hdfs_params, 
_data_path);
+    }
+
+    std::filesystem::path 
dir_path(std::filesystem::absolute(std::filesystem::path(build_dir)));
+    if (!std::filesystem::is_directory(std::filesystem::status(dir_path)))
+        LOG(FATAL) << "build dir should be a directory";
+
+    // init and open storage engine
+    std::vector<doris::StorePath> paths;
+    auto olap_res = doris::parse_conf_store_paths(_build_dir, &paths);
+    if (!olap_res) {
+        LOG(FATAL) << "parse config storage path failed, path=" << 
doris::config::storage_root_path;
+        exit(-1);
+    }
+    doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths);
+
+    doris::EngineOptions options;
+    options.store_paths = paths;
+    options.backend_uid = doris::UniqueId::gen_uid();
+    doris::StorageEngine* engine = nullptr;
+    auto st = doris::StorageEngine::open(options, &engine);
+    if (!st.ok()) {
+        LOG(FATAL) << "fail to open StorageEngine, res=" << st;
+        exit(-1);
+    }
+}
+
+std::string BuilderHelper::read_local_file(const std::string& file) {
+    std::filesystem::path 
path(std::filesystem::absolute(std::filesystem::path(file)));
+    if (!std::filesystem::exists(path)) LOG(FATAL) << "file not exist:" << 
file;
+
+    std::ifstream f(path, std::ios::in | std::ios::binary);
+    const auto sz = std::filesystem::file_size(path);
+    std::string result(sz, '\0');
+    f.read(result.data(), sz);
+
+    return result;
+}
+
+void BuilderHelper::build() {
+    // load tablet
+    std::string buf;
+    if (_isHDFS) {
+        FileSystemProperties system_properties;
+        LOG(INFO) << "read meta from HDFS:" << _meta_file;
+        auto* dfs = hdfsFileSystem.get();
+        bool exist = false;
+        auto st = dfs->exists(_meta_file, &exist);
+        if (!st.ok() || !exist) {
+            LOG(FATAL) << "file not exist:" /*<< dfs->GetLastError() */ << 
_meta_file;
+        }
+
+        size_t size = 0;
+        st = dfs->file_size(_meta_file, &size);
+        if (!st.ok() || size <= 0)
+            LOG(FATAL) << "meta file size abnormal.."; //<< 
dfs->GetLastError();
+        IOContext io_ctx;
+        io::FileReaderSPtr read_ptr = nullptr;
+        st = dfs->open_file(_meta_file, &read_ptr, &io_ctx);
+        if (!st.ok() || !read_ptr)
+            LOG(FATAL) << "cannot open file:" /*<< dfs->GetLastError()*/ << 
_meta_file;
+
+        buf.resize(size);
+
+        size_t read_size = 0;
+        read_ptr->read_at(0, Slice(buf.data(), size), io_ctx, &read_size);
+        if (read_size < size) {
+            LOG(FATAL) << "read meta file size abnormal.."; //<< 
dfs->GetLastError();
+        }
+
+        read_ptr->close();
+    } else {
+        buf = read_local_file(_meta_file);
+    }
+
+    FileHeader<TabletMetaPB> file_header;
+    buf = buf.substr(file_header.size());
+    // init tablet
+    TabletMeta* tablet_meta = new TabletMeta();
+    Status status = tablet_meta->deserialize(buf);
+    if (!status.ok()) {
+        LOG(FATAL) << "fail to load tablet meta :" << status.to_string();
+        return;
+    }
+
+    LOG(INFO) << "table id:" << tablet_meta->table_id() << " tablet id:" << 
tablet_meta->tablet_id()
+              << " shard id:" << tablet_meta->shard_id();
+
+    auto data_dir = StorageEngine::instance()->get_store(_build_dir);
+    TabletMetaSharedPtr tablet_meta_ptr(tablet_meta);
+    TabletSharedPtr new_tablet = 
doris::Tablet::create_tablet_from_meta(tablet_meta_ptr, data_dir);
+    status = StorageEngine::instance()->tablet_manager()->_add_tablet_unlocked(
+            tablet_meta->tablet_id(), new_tablet, false, true);
+
+    if (!status.ok()) {
+        LOG(FATAL) << "fail to add tablet to storage :" << status.to_string();
+        return;
+    }
+
+    std::vector<std::string> files;
+    if (_isHDFS) {
+        auto* dfs = hdfsFileSystem.get();
+        std::vector<io::Path> names;
+        dfs->list(_data_path, &names);
+        for (const auto& name : names) {
+            std::string filename = name.filename().c_str();
+            if (filename.substr(filename.size() - _file_type.size()) == 
_file_type) {
+                LOG(INFO) << "get file:" << name;
+                files.emplace_back(filename);
+            }

Review Comment:
   suggest to add some warning log when the file type are not matched both in 
hdfs and local dir.



##########
be/src/tools/builder_scanner_memtable.cpp:
##########
@@ -0,0 +1,346 @@
+#include "tools/builder_scanner_memtable.h"
+
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+#include "hdfs/hdfs.h"
+#include "olap/delta_writer.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/descriptors.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris {
+
+static const int TUPLE_ID_DST = 0;
+static const int TUPLE_ID_SRC = 1;
+static const int BATCH_SIZE = 8192;
+
+BuilderScannerMemtable::BuilderScannerMemtable(TabletSharedPtr tablet, const 
std::string& build_dir,
+                                               const std::string& file_type, 
bool isHDFS)
+        : _runtime_state(TQueryGlobals()),
+          _tablet(tablet),
+          _build_dir(build_dir),
+          _file_type(file_type),
+          _isHDFS(isHDFS) {
+    init();
+    _runtime_state.init_scanner_mem_trackers();
+    TUniqueId uid;
+    uid.hi = 1;
+    uid.lo = 1;
+    TQueryOptions _options;
+    _options.batch_size = BATCH_SIZE;
+    _options.enable_vectorized_engine = true;
+    auto* _exec_env = ExecEnv::GetInstance();
+    _runtime_state.init(uid, _options, TQueryGlobals(), _exec_env);
+    _runtime_state.init_mem_trackers(uid);
+}
+
+void BuilderScannerMemtable::init() {
+    create_expr_info();
+    init_desc_table();
+
+    // Node Id
+    _tnode.node_id = 0;
+    _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
+    _tnode.num_children = 0;
+    _tnode.limit = -1;
+    _tnode.row_tuples.push_back(0);
+    _tnode.nullable_tuples.push_back(false);
+    _tnode.broker_scan_node.tuple_id = 0;
+    _tnode.__isset.broker_scan_node = true;
+}
+
+TPrimitiveType::type BuilderScannerMemtable::getPrimitiveType(FieldType t) {
+    switch (t) {
+    case FieldType::OLAP_FIELD_TYPE_OBJECT: {
+        return TPrimitiveType::OBJECT;
+    }
+    case FieldType::OLAP_FIELD_TYPE_HLL: {
+        return TPrimitiveType::HLL;
+    }
+    case FieldType::OLAP_FIELD_TYPE_CHAR: {
+        return TPrimitiveType::CHAR;
+    }
+    case FieldType::OLAP_FIELD_TYPE_VARCHAR: {
+        return TPrimitiveType::VARCHAR;
+    }
+    case FieldType::OLAP_FIELD_TYPE_STRING: {
+        return TPrimitiveType::STRING;
+    }
+    case FieldType::OLAP_FIELD_TYPE_DATE: {
+        return TPrimitiveType::DATE;
+    }
+    case FieldType::OLAP_FIELD_TYPE_DATETIME: {
+        return TPrimitiveType::DATETIME;
+    }
+    case FieldType::OLAP_FIELD_TYPE_DATEV2: {
+        return TPrimitiveType::DATEV2;
+    }
+    case FieldType::OLAP_FIELD_TYPE_DATETIMEV2: {
+        return TPrimitiveType::DATETIMEV2;
+    }
+    case FieldType::OLAP_FIELD_TYPE_DECIMAL:
+    case FieldType::OLAP_FIELD_TYPE_DECIMAL32: {
+        return TPrimitiveType::DECIMAL32;
+    }
+    case FieldType::OLAP_FIELD_TYPE_DECIMAL64: {
+        return TPrimitiveType::DECIMAL64;
+    }
+    case FieldType::OLAP_FIELD_TYPE_DECIMAL128I: {
+        return TPrimitiveType::DECIMAL128I;
+    }
+    case FieldType::OLAP_FIELD_TYPE_JSONB: {
+        return TPrimitiveType::JSONB;
+    }
+    case FieldType::OLAP_FIELD_TYPE_BOOL: {
+        return TPrimitiveType::BOOLEAN;
+    }
+    case FieldType::OLAP_FIELD_TYPE_TINYINT: {
+        return TPrimitiveType::TINYINT;
+    }
+    case FieldType::OLAP_FIELD_TYPE_SMALLINT: {
+        return TPrimitiveType::SMALLINT;
+    }
+    case FieldType::OLAP_FIELD_TYPE_INT: {
+        return TPrimitiveType::INT;
+    }
+    case FieldType::OLAP_FIELD_TYPE_BIGINT: {
+        return TPrimitiveType::BIGINT;
+    }
+    case FieldType::OLAP_FIELD_TYPE_LARGEINT: {
+        return TPrimitiveType::LARGEINT;
+    }
+    case FieldType::OLAP_FIELD_TYPE_FLOAT: {
+        return TPrimitiveType::FLOAT;
+    }
+    case FieldType::OLAP_FIELD_TYPE_DOUBLE: {
+        return TPrimitiveType::DOUBLE;
+    }
+    case FieldType::OLAP_FIELD_TYPE_ARRAY: {
+        return TPrimitiveType::ARRAY;
+    }
+    default: {
+        LOG(FATAL) << "unknown type error:" << t;
+    }
+    }
+}
+TDescriptorTable BuilderScannerMemtable::create_descriptor_tablet() {
+    TDescriptorTableBuilder dtb;
+
+    // build DST table descriptor
+    {
+        TTupleDescriptorBuilder tuple_builder;
+        for (int i = 0; i < _tablet->num_columns(); i++) {
+            const auto& col = _tablet->tablet_schema()->column(i);
+
+            tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                           
.type(thrift_to_type(getPrimitiveType(col.type())))
+                                           .column_name(col.name())
+                                           .column_pos(i)
+                                           .length(col.length())
+                                           .build());
+        }
+        tuple_builder.build(&dtb);
+    }
+
+    // build SRC table descriptor
+    {
+        TTupleDescriptorBuilder tuple_builder;
+        for (int i = 0; i < _tablet->num_columns(); i++) {
+            const auto& col = _tablet->tablet_schema()->column(i);
+
+            tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                           
.type(thrift_to_type(getPrimitiveType(col.type())))
+                                           .column_name(col.name())
+                                           .column_pos(i)
+                                           .length(col.length())
+                                           .build());
+        }
+        tuple_builder.build(&dtb);
+    }
+
+    return dtb.desc_tbl();
+}
+
+void BuilderScannerMemtable::init_desc_table() {
+    TDescriptorTable t_desc_table = create_descriptor_tablet();
+
+    // table descriptors
+    TTableDescriptor t_table_desc;
+
+    t_table_desc.id = _tablet->table_id();
+    t_table_desc.tableType = TTableType::OLAP_TABLE;
+    t_table_desc.numCols = _tablet->num_columns();
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+
+    DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+
+    _runtime_state.set_desc_tbl(_desc_tbl);
+}
+
+void BuilderScannerMemtable::create_expr_info() {
+    TTypeDesc varchar_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::VARCHAR);
+        scalar_type.__set_len(65535);
+        node.__set_scalar_type(scalar_type);
+        varchar_type.types.push_back(node);
+    }
+    for (int i = 0; i < _tablet->num_columns(); i++) {
+        auto col = _tablet->tablet_schema()->column(i);
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = _tablet->num_columns() + i;
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(i, expr);
+        _params.src_slot_ids.push_back(_tablet->num_columns() + i);
+    }
+
+    // _params.__isset.expr_of_dest_slot = true;
+    _params.__set_dest_tuple_id(TUPLE_ID_DST);
+    _params.__set_src_tuple_id(TUPLE_ID_SRC);
+}
+
+void BuilderScannerMemtable::build_scan_ranges(std::vector<TBrokerRangeDesc>& 
ranges,
+                                               const std::vector<std::string>& 
files) {
+    LOG(INFO) << "build scan ranges for files size:" << files.size() << " 
file_type:" << _file_type;
+    for (const auto& file : files) {
+        TBrokerRangeDesc range;
+        range.start_offset = 0;
+        range.size = -1;
+        range.format_type = TFileFormatType::FORMAT_PARQUET;
+        range.splittable = true;
+
+        range.path = file;
+        range.file_type = _isHDFS ? TFileType::FILE_HDFS : 
TFileType::FILE_LOCAL;
+        ranges.push_back(range);
+    }
+
+    if (!ranges.size()) LOG(FATAL) << "cannot get valid scan file!";
+}
+
+void BuilderScannerMemtable::doSegmentBuild(const std::vector<std::string>& 
files) {
+    vectorized::VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+    scan_node.init(_tnode);
+    auto status = scan_node.prepare(&_runtime_state);
+    if (!status.ok()) LOG(FATAL) << "prepare scan node fail:" << 
status.to_string();
+
+    // set scan range
+    std::vector<TScanRangeParams> scan_ranges;
+    {
+        TScanRangeParams scan_range_params;
+
+        TBrokerScanRange broker_scan_range;
+        broker_scan_range.params = _params;
+        build_scan_ranges(broker_scan_range.ranges, files);
+        
scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+        scan_ranges.push_back(scan_range_params);
+    }
+
+    scan_node.set_scan_ranges(scan_ranges);
+    status = scan_node.open(&_runtime_state);
+    if (!status.ok()) LOG(FATAL) << "open scan node fail:" << 
status.to_string();
+
+    // std::unique_ptr<RowsetWriter> rowset_writer;
+    PUniqueId load_id;
+    load_id.set_hi(1);
+    load_id.set_lo(1);
+    int64_t transaction_id = 1;
+
+    // delta writer
+    TupleDescriptor* tuple_desc = 
_desc_tbl->get_tuple_descriptor(TUPLE_ID_DST);
+    WriteRequest write_req = {_tablet->tablet_meta()->tablet_id(),
+                              _tablet->schema_hash(),
+                              WriteType::LOAD,
+                              transaction_id,
+                              _tablet->partition_id(),
+                              load_id,
+                              tuple_desc,
+                              &(tuple_desc->slots())};
+
+    DeltaWriter* delta_writer = nullptr;
+    DeltaWriter::open(&write_req, &delta_writer, load_id);
+    status = delta_writer->init();
+    if (!status.ok()) LOG(FATAL) << "delta_writer init fail:" << 
status.to_string();
+
+    std::filesystem::path segment_path(std::filesystem::path(_build_dir + 
"/segment"));
+    std::filesystem::remove_all(segment_path);

Review Comment:
   Nit::
   Can we guarantee that the `segment_path` is always a new empty directory ? 
   If so it had better to failed rather than `remove all` when there are files 
under this path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to